大数据基础

pyspark-RDD基础

spark

1
pyspark是spark的python API,允许python调用spark编程模型

初始化spark

SparkContext

1
2
from pyspark import SparkContext
sc = SparkContext(master='local[2]')

核查SparkContext

1
2
3
4
5
6
7
8
9
sc.version									获取SparkContext的版本
sc.pythonVer 获取python版本
sc.master 要连接的Master URL
str(sc.sparkHome) spark工作节点的安装路径
str(sc.sparkUser()) 获取SparkContext的spark用户名
sc.appName 返回应用名称
sc.applicationId 返回应用程序ID
sc.defaultParallelism 返回默认并行级别
sc.defaultMinPatitions RDD默认最小分区数

配置

1
2
3
from pyspark import SparkConf,SparkContext
conf = (SparkConf().setMaster("local").setAppName("my APP").set("spark.executor.memory","1g"))
sc = SparkContext(conf=conf)

使用shell

pyspark shell已经为SparkContext创建了名为sc的变量

1
2
./bin/spark-shell --master local[2]
./bin/pyspark --master local[4] --py-files code.py

用—master参数设定Context连接到哪个Master服务器,通过传递逗号分隔列表至—py-files添加Python.zip、egg或.py文件到Runtime路径

加载数据

并行集合

1
2
3
4
rdd = sc.parallelize([('a',7),('a',2),('b',2)])
rdd2 = sc.parallelize([('a',2),('d',1),('b',1)])
rdd3 = sc.parallelize(range(100))
rdd4 = sc.parallelize([("a",["x","y","z"]),("b",["p","r"])])

外部数据

使用textFile()函数从HDFS、本地文件或其它支持hadoop的文件系统里读取文件,或使用wholeTextFiles()函数读取目录下所有文本文件

1
2
textFile = sc.textFile('a.txt')
textFile2 = sc.wholeTextFiles(/aa)

提取RDD信息

基础信息

1
2
3
4
5
6
7
8
9
10
11
rdd.getNumPatitions()								列出分区数
rdd.count() 计算RDD的实例数量
rdd.countByKey() 按键计算RDD实例数量
defaultdict(<type 'int'>,('a':2,'b':1))
rdd.countByValue() 按值计算RDD实例数量
defaultdict(<type 'int'>,(('b',2):1,('a',2):1,('a',7):1))
rdd.collectAsMap() 以字典的形式返回键值
('a':2,'b':2)
rdd.sum() 汇总RDD元素
4959
sc.parallelize([]).isEmpty() 检查RDD是否为空

汇总

1
2
3
4
5
6
7
rdd.max()					RDD元素的最大值
rdd.min() RDD元素的最小值
rdd.mean() RDD元素的平均值
rdd.stdev() RDD元素的标准差
rdd.variance() RDD元素的方差
rdd.histogram(3) 分箱(bin)生成直方图
rdd.stats() 综合统计包括:计数、平均值、标准差、最大值和最小值

应用函数

1
2
3
4
rdd.map(lambda x:x+(x[1],x[0])).collect()		对每个RDD元素执行函数
rdd.flatMap(lambda x:x+(x[1],x[0])) 对每个RDD元素执行函数,并拉平结果
rdd.collect()
rdd.flatMapValues(lambda x:x).collect() 不改变键,对rdd的每个键值对执行flatMap函数

选择数据

1
2
3
4
5
6
7
8
9
10
11
获取
rdd.collect() 返回包含所以RDD元素的列表
rdd.take(4) 提取前4个RDD元素
rdd.first() 提取第一个RDD元素
rdd.top(2) 提取前两个RDD元素
抽样
rdd.sample(False,0.15,81) 返回RDD的采样子集
筛选
rdd.filter(lambda x:'a' in x) 筛选RDD
rdd.distinct() 返回RDD里的唯一值
rdd.keys() 返回RDD键值对里的键

迭代

1
2
def g(x):print(x)     
rdd.foreach(g)

改变数据形状

1
2
3
4
5
6
7
8
9
10
11
12
13
14
规约
rdd.reduceByKey(lambda x,y:x+y) 合并每个键的值
rdd.reduce(lambda x,y:x+y) 合并RDD的值
分组
rdd.groupBy(lambda x:x%2).mapValues(list) 返回RDD的分组值
rdd.groupByKey().mapValues(list) 按键分组RDD
集合
seqOp = (lambda x,y:(x[0]+y,x[1]+1))
combOP = (lambda x,y:(x[0]+y[0],x[1]+y[1]))
rdd.aggregate((0,0),seqOp,combOP) 汇总每个分区里的RDD元素,并输出结果
rdd.aggregeteByKey((0,0),seqOp,combOP) 汇总每个RDD的键的值
rdd.fold(0,add) 汇总每个分区里的RDD元素,并输出结果
rdd.foldByKey(0,add) 合并每个键的值
rdd,keyBy(lambda x:x+x) 通过执行函数,创建RDD元素的元组

数学运算

1
2
3
rdd.subtract(rdd2)							返回RDD2里没有匹配键的rdd的兼职对
rdd2.subtractByKey(rdd) 返回rdd2里的每个(键、值)对,rdd中,没有匹配的键
rdd.cartesian(rdd2) 返回rdd和rdd2的笛卡尔积

排序

1
2
rdd.sortBy(lambda x:x[1])					按给定函数排序RDD
rdd.sortByKey() 按键排序RDD的键值对

重分区

1
2
rdd.repartition(4)							新建一个含4个分区的RDD
rdd.coalesce(1) 将RDD中的分区数缩减为1个

保存

1
2
rdd.saveAsTextFile("rdd.txt")
rdd.saveAsHadoopFile("hdfs://namenodehost/parent/child",'org.apache.hadoop.mapred.TextOutputFormat')

终止SparkContext

1
sc.stop()

执行程序

1
./bin/spark-submit examples/src/main/python/pi.py

Pyspark_sql

Pyspark与Spark SQL

Spark SQL是Apache Spark处理结构化数据的模块

初始化SparkSession

SparkSession用于创建数据框,将数据框注册为表,执行SQL查询,缓存表及读取Parquet文件

1
2
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("my app").config("spark.some.config.option","some-value").getOrCreate()

创建数据框

从RDD创建
1
2
3
4
5
6
7
8
9
10
11
12
13
from pyspark.sql.types import *
推断Schema
sc = spark.sparkContext
lines = sc.textFile("people.txt")
parts = lines.map(lambda l:l.split(","))
people = parts.map(lambda p:Row(name=p[0],age=int(p[1])))
peopledf = spark.createDataFrame(people)
指定Schema
people = parts.map(lambda p:Row(name=p[0],age=int(p[1].strip())))
schemaString = "name age"
fields = [StructField(field_name,StringType(),True) for field_name in schemaString.split()]
schema = StructType(fields)
spark.createDataFrame(people,schema).show()
从spark数据源创建
1
2
3
4
5
6
7
8
json
df = spark.read.json("customer.json")
df.show()
df2 = spark.read.load("people.json",format = "json")
Parquet文件
df3 = spark.read.load("users.parquet")
文本文件
df4 = spark.read.text("people.txt")
查阅数据信息
1
2
3
4
5
6
7
8
9
10
11
12
df.dtypes						返回df的列名与数据类型
df.show() 显示df内容
df.head() 返回前n行数据
df.first() 返回第一行数据
df.take(2) 返回前两行数据
df.schema 返回df的schema
df.describe().show() 汇总统计数据
df.columns 返回df列名
df.count() 返回df的行数
df.distinct().count() 返回df中不重复的行数
df.printSchema() 返回df的Schema
df.explain() 返回逻辑与实体方案
重复值
1
df = df.dropDuplicates()
查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from pyspark.sql import functions as F
Select
df.select("firstName").show() 显示firstName列的所有条目
df.select("firstName","lastName".show())
df.select("firstName","age",\
explode("phoneNumber")\ 显示firstName、age的所有条目和类型
.alias("contactInfo"))\
.select("ContactInfo.type","firstName","age")
df.select(df["firstName"],df["age"]+1).show() 显示firstName和age列的所有记录添加
df.select(df["age"]>24).show() 显示所有小于24的记录
When
df.select("firstName",F.when(df.age>30,1))\ 显示firstName,且大于30岁显示1,小于30显示0
.otherwise(0).show()
df[df.firstName.isin("Jane","Boris")].collect() 显示符合特定条件的firstName列的记录
Like
df.select("firstName",df.lastName,\ 显示lastName列中包含Smith的firstName列的记录
like("Smith")).show()
Startswith-Endwith
df.select("firstName",df.lastName.\ 显示lastName列中以Sm开头的firstName列的记录
startswith("Sm")).show()
df.select(df.lastName.endswith("th")).show() 显示以th结尾的lastName
Substring
df.select(df.firstName.substr(1,3).alias("name"))返回firstName的子字符串
Between
df.select(df.age.between(22,24)).show() 显示介于2224直接的age列的所有记录

添加、修改、删除列

添加列
1
2
3
4
5
6
df = df.withColumn('city',df.address.city) \
.withColumn('postalCode',df.address.postalCode) \
.withColumn('state',df.address.state) \
.withColumn('streetAddress',df.address.streetAddress) \
.withColumn('telePhoneNumber',explode(df.phoneNumber.number)) \
.withColumn('telePhoneType',explode(df.phoneNumber.type)) \
修改列
1
df = df.withColumnRenamed('telePhoneNumber','phoneNumber')
删除列
1
2
df = df.drop("address","phoneNumber")
df = df.drop(df.address).drop(df.phoneNumber)
分组
1
df.groupBy("age").count().show()		按age列分组,统计每组人数
筛选
1
df.filter(df["age"]>24).show()			按age列筛选,保留年龄大于24岁的
排序
1
2
3
peopledf.sort(peopledf.age.desc()).collect()
df.sort("age",ascending=False).collect()
df.orderBy(["age","city"],ascending=[0,1]).collect()
替换缺失值
1
2
3
df.na.fill(50).show()				用一个值替换空值
df.na.drop().show() 去除df中为空值的行
df.na.replace(10,20).show() 用一个值去替换另一个值
重分区
1
2
df.repartition(10).rdd.getNumPartitions()	将df拆分为10个分区
df.coalesce(1).rdd.getNumPartitions() 将df合并为1个分区

运行SQL查询

将数据框注册为视图
1
2
3
peopledf.createGlobalTempView("people")
df.createTempView("customer")
df.createOrReplaceTempView("customer")
查询视图
1
2
df = spark.sql("select * from customer").show()
peopledf = spark.sql("select * from global_temp.people").show()

输出

数据结构
1
2
3
rdd1 = df.rdd		将df转为rdd
df.toJSON().first() 将df转为rdd字符串
df.toPandas() 将df的内容转为Pandas的数据框
保存至文件
1
2
df.select("firstName","city").write.save("nameAndCity.parquet")
df.select("firstName","age").write.save("nameAndAges.json",format="json")
终止SparkSession
1
spark.stop()
文章目录
  1. 1. pyspark-RDD基础
    1. 1.0.1. spark
    2. 1.0.2. 初始化spark
      1. 1.0.2.1. SparkContext
      2. 1.0.2.2. 核查SparkContext
      3. 1.0.2.3. 配置
      4. 1.0.2.4. 使用shell
    3. 1.0.3. 加载数据
      1. 1.0.3.1. 并行集合
      2. 1.0.3.2. 外部数据
    4. 1.0.4. 提取RDD信息
      1. 1.0.4.1. 基础信息
      2. 1.0.4.2. 汇总
      3. 1.0.4.3. 应用函数
      4. 1.0.4.4. 选择数据
      5. 1.0.4.5. 迭代
      6. 1.0.4.6. 改变数据形状
      7. 1.0.4.7. 数学运算
      8. 1.0.4.8. 排序
      9. 1.0.4.9. 重分区
      10. 1.0.4.10. 保存
      11. 1.0.4.11. 终止SparkContext
      12. 1.0.4.12. 执行程序
  • 2. Pyspark_sql
    1. 2.0.0.1. Pyspark与Spark SQL
    2. 2.0.0.2. 初始化SparkSession
    3. 2.0.0.3. 创建数据框
      1. 2.0.0.3.1. 从RDD创建
      2. 2.0.0.3.2. 从spark数据源创建
      3. 2.0.0.3.3. 查阅数据信息
      4. 2.0.0.3.4. 重复值
      5. 2.0.0.3.5. 查询
    4. 2.0.0.4. 添加、修改、删除列
      1. 2.0.0.4.1. 添加列
      2. 2.0.0.4.2. 修改列
      3. 2.0.0.4.3. 删除列
      4. 2.0.0.4.4. 分组
      5. 2.0.0.4.5. 筛选
      6. 2.0.0.4.6. 排序
      7. 2.0.0.4.7. 替换缺失值
      8. 2.0.0.4.8. 重分区
    5. 2.0.0.5. 运行SQL查询
      1. 2.0.0.5.1. 将数据框注册为视图
      2. 2.0.0.5.2. 查询视图
    6. 2.0.0.6. 输出
      1. 2.0.0.6.1. 数据结构
      2. 2.0.0.6.2. 保存至文件
      3. 2.0.0.6.3. 终止SparkSession
  • |