《pyspark实战指南》阅读笔记

读了《pyspark实战指南》的前几章,关于数据分析的部分暂时还没看…

SparkSession本质上是以下这些的组合

1
2
3
4
5
6
7
{
SparkConf
SparkContext
SQLContext
HiveContext
StreamingContext
}

pyspark中两种方式创建RDD

1
2
1> rdd = sc.parallelize([('c','v'),('c2','v2'),('c3','v3')]
2> rdd = sc.textFile('py/xxx/sdsd.txt.gz',10)

该数据集被划分为10个分区(经验:把每1个集群中的数据集分成2-4个分区)

spark可以自动处理压缩数据集

根据数据读取方式的不同,持有的对象以略有不同的形式存在。

从文件中读取的对象为MapPartitionsRDD;
以paralellize(…)方法对一个集合进行操作时的对象为 ParallelCollectionRDD。

.collect() 和 .glom()

.collect() 会将数据集送回到驱动程序(driver),驱动程序将其序列化为一个列表。
.glom()和.collect()配合使用会产生一个列表(该列表大小即分区数量),其中每个元素是指定分区中的所有元素的另一个列表

1
2
3
4
5
>>> rdd2=sc.parallelize([('a',4),('a',1),('b',8),('d',15)],3)
>>> rdd2.glom().collect()
[[('a', 4)], [('a', 1)], [('b', 8), ('d', 15)]]
>>> len(rdd2.glom().collect())
3

.collect() 和 .take() 和 .takeSample(…)

1
2
3
4
5
.take(n) 只返回单个分区的前n行,.collect()是返回整个RDD
.takeSample(param1,param2,param3)
- param1 代表采样是否应该被替换 (False)
- param2 指定要返回的记录数量 (1)
- param3 伪随机数发生器的种子 (667)

谨记,定义纯Python方法会降低应用程序的速度,因为Spark会在Python解释器和JVM之间连续切换。所以应该尽可能用Spark的API。

.flatMap(…)方法和.map(…)方法类似。但是

.flatMap(…)返回的是一个扁平的结果,而不是一个列表。
比如:

1
2
3
4
5
6
>>> list=[(0,1),(1,2),(2,3)]
>>> rdd=sc.parallelize(list)
>>> rdd.map(lambda x:(x[0],x[1])).collect()
[(0, 1), (1, 2), (2, 3)]
>>> rdd.flatMap(lambda x:(x[0],x[1])).collect()
[0, 1, 1, 2, 2, 3]

rdd中高开销的方法

1
2
3
1. .distinct()
2. rdd1.leftOuterJoin(rdd2)
3. .repartition(...)

rdd 连接

左外连接 leftOuterJoin 和 内连接join

1
2
3
4
5
6
>>> rdd1=sc.parallelize([('a',1),('b',4),('c',10)])
>>> rdd2=sc.parallelize([('a',4),('a',3),('b',8),('d',15)])
>>> rdd1.leftOuterJoin(rdd2).collect()
[('b', (4, 8)), ('c', (10, None)), ('a', (1, 4)), ('a', (1, 3))]
>>> rdd1.join(rdd2).collect()
[('b', (4, 8)), ('a', (1, 4)), ('a', (1, 3))]

返回两个RDD中相等的数据 intersection

1
2
3
4
>>> rdd1=sc.parallelize([('a',1),('b',4),('c',10)])
>>> rdd2=sc.parallelize([('a',4),('a',1),('b',8),('d',15)])
>>> rdd1.intersection(rdd2).collect()
[('a', 1)]

.repartition(…)转换

重新对数据集进行分区,改变数据集分区的数量。此功能应该谨慎使用。因为它会重组数据,导致对性能方面产生巨大的影响。

1
2
3
4
5
6
7
8
9
10
>>> rdd2=sc.parallelize([('a',4),('a',1),('b',8),('d',15)],3)	# 3个分区
>>> rdd2.glom().collect()
[[('a', 4)], [('a', 1)], [('b', 8), ('d', 15)]]
>>> len(rdd2.glom().collect())
3
>>> rdd3=rdd2.repartition(4) # 4个分区
>>> rdd3.glom().collect()
[[('b', 8), ('d', 15)], [('a', 1)], [], [('a', 4)]]
>>> len(rdd3.glom().collect())
4

.reduce(…) 和 .reduceByKey(…)

1
2
3
4
5
6
>>> rdd2.collect()          
[('a', 4), ('a', 1), ('b', 8), ('d', 15)]
>>> rdd2.map(lambda row : row[1]).reduce(lambda x,y:x+y)
28
>>> rdd2.reduceByKey(lambda x,y:x+y).collect()
[('d', 15), ('b', 8), ('a', 5)]

.count() 、 .countByKey() 和 .countByValue()

.count()获取数据集总数,不需要把整个数据集移动到驱动程序
.countByKey().items()

1
2
3
4
5
6
7
8
>>> rdd2.collect()             
[('a', 4), ('a', 1), ('b', 8), ('d', 15)]
>>> rdd2.count()
4
>>> rdd2.countByKey().items()
dict_items([('a', 2), ('b', 1), ('d', 1)])
>>> rdd2.countByValue().items()
dict_items([(('d', 15), 1), (('a', 1), 1), (('a', 4), 1), (('b', 8), 1)])

.saveAsTextFile(…)方法

将RDD保存为文本文件,一个分区一个文件,以行为单位

.foreach(…)方法

该方法对RDD里的每个元素,用迭代的方式应用相同的函数;和map()比较,foreach是按照一个接一个的方式,对每一条记录应用一个定义好的函数。
多用于保存到数据库

1
2
3
def f(x):
print(x)
rdd.foreach(f)

打印所有记录,其中多次打印,每次的顺序可能不同

Python RDD 比 Scala的RDD慢很多;DataFrame 性能接近。

Python UDF会导致Python和Java虚拟机之间的往返通信。(性能低下)

将 DataFrame注册为表

Spark 2.x createOrReplaceTempView()
Spark 1.x registerTempTable()[已废弃]

DataFrame filter

1
2
3
4
5
>>> df.select("name","age").filter("name like 'B%'").show()
name | age
------------
Bill | 23
Bob | 22

DataFrame 删除重复的行

1
df.dropDuplicates()

df 去重(除id列以外)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
>>> list=[(1,'frank',14,'男'),(2,'frank',14,'男'),(4,'bill',16,'男'),(4,'tom',24,'男'),(5,'lucy',16,'女'),(6,'celly',17,'女')]
>>> rdd = sc.parallelize(list)
>>> df = spark.createDataFrame(rdd,['id','name','age','gender'])
>>> df.show()
+---+-----+---+------+
| id| name|age|gender|
+---+-----+---+------+
| 1|frank| 14| 男|
| 2|frank| 14| 男|
| 4| bill| 16| 男|
| 4| tom| 24| 男|
| 5| lucy| 16| 女|
| 6|celly| 17| 女|
+---+-----+---+------+

>>> df.select([c for c in df.columns if c!='id']).distinct().show()
+-----+---+------+
| name|age|gender|
+-----+---+------+
| tom| 24| 男|
| bill| 16| 男|
|celly| 17| 女|
| lucy| 16| 女|
|frank| 14| 男|
+-----+---+------+

将重复数据行从原有DF中去掉

1
2
3
4
5
6
7
8
9
10
11
>>> df=df.dropDuplicates(subset=[c for c in df.columns if c!='id'])
>>> df.show()
+---+-----+---+------+
| id| name|age|gender|
+---+-----+---+------+
| 4| tom| 24| 男|
| 4| bill| 16| 男|
| 6|celly| 17| 女|
| 5| lucy| 16| 女|
| 1|frank| 14| 男|
+---+-----+---+------+

.dropDuplicates(…) subset 参数指定需要处理的列

加上新的id列

1
2
3
4
5
6
7
8
9
10
11
>>> import pyspark.sql.functions as fn
>>> df.withColumn('new_id',fn.monotonically_increasing_id()).show()
+---+-----+---+------+-------------+
| id| name|age|gender| new_id|
+---+-----+---+------+-------------+
| 4| tom| 24| 男| 386547056640|
| 3| bill| 16| 男| 498216206336|
| 6|celly| 17| 女|1262720385024|
| 5| lucy| 16| 女|1408749273088|
| 1|frank| 14| 男|1666447310848|
+---+-----+---+------+-------------+

.monotonically_increasing_id()方法给每一条记录提供了一个唯一并且递增的ID。
如果你的数据存放位置在大约不到10亿个分区中,每个分区的记录少于8亿条,ID就能保证是唯一的。

ps:在Spark2.0之前的版本中同一个DF 用.monotonically_increasing_id()调用多次未必会返回相同的ID,这在2.0中已经修复。

计算id的总数和 id的唯一个数,使用 .agg(…)方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
>>> df = spark.createDataFrame(rdd,['id','name','age','gender'])
>>> df.show()
+---+-----+---+------+
| id| name|age|gender|
+---+-----+---+------+
| 1|frank| 14| 男|
| 2|frank| 14| 男|
| 4| bill| 16| 男|
| 4| tom| 24| 男|
| 5| lucy| 16| 女|
| 6|celly| 17| 女|
+---+-----+---+------+

>>> import pyspark.sql.functions as fn
>>> df.agg(
... fn.count('id').alias('id_count'),
... fn.countDistinct('id').alias('id_distinct_count')
... ).show()
+--------+-----------------+
|id_count|id_distinct_count|
+--------+-----------------+
| 6| 5|
+--------+-----------------+

统计每行的空值数量

1
2
3
4
5
6
7
>>> list=[(1,'frank',14,'男'),(2,'frank',14,'男'),(3,'','',''),(4,'tom',24,''),(5,'lucy','',''),(6,'celly',17,'')]
>>> rdd = sc.parallelize(list)
>>> df=spark.createDataFrame(rdd,['id','name','age','gender'])
>>> df.rdd.map(lambda row:(row['id'],sum(c==None for c in row))).collect()
[(1, 0), (2, 0), (3, 1), (4, 0), (5, 1), (6, 0)]
>>> df.rdd.map(lambda row:(row['id'],sum(c=='' for c in row))).collect()
[(1, 0), (2, 0), (3, 2), (4, 1), (5, 1), (6, 1)]

ps: long类型空值为null(None),string空值为空字符串

获取路径作为表的一个字段

hdfs存储路径如下:

1
2
3
/tmp/aaa/a.parquet
/tmp/bbb/b.parquet
/tmp/ccc/c.parquet

其中 假设*.parquet文件中存放的是不同组的特征信息,而 aaabbbccc是这各组特征文件的组名,而我们使用以下方式读取所有数据的时候,
获取到的df是不会包含 aaa等这些分组名的。

1
df = spark.read.parquet('/tmp/*/p*')

那么如何获文件路径,并加到表中呢?
我们可以使用 functions中的input_file_name()方法,给每一行新增一个列,列值为该行所在数据文件的path,我们可以对path进行截取,从而获得分组名称。此处就不截取啦,因为有些场景需要用到全路径。

1
2
3
from pyspark.sql import functions as fn
df = spark.read.parquet('/tmp/*/p*')
df = df.withColumn("input", fn.input_file_name())

假如文件是text格式,我们还可以怎么做呢?
下面是sparkContext的一个api, 可以读取路径下的所有文件(一般用来读大量小文件),返回

1
wholeTextFiles(path, minPartitions=None, use_unicode=True

具体调用如下

1
2
rdd = sc.wholeTextFiles('/tmp/*')
rdd.collect()

path 可以这样指定: /tmp/*/tmp/*/*.parquet
返回的rdd是一个元组,key为文件路径;value为该文件的数据。