pyspark-dataFrame-常用操作

本文知识点

  1. DataFrame的创建
  2. 查看DataFrame的基本操作

一. 从其它类型创建DataFrame

  1. list[tuple] -> DataFrame,不指定 schema

    1
    2
    3
    >>> l = [('Alice', 1)]
    >>> spark.createDataFrame(l).collect()
    [Row(_1=u'Alice', _2=1)]
  2. DataFrame -> DataFrame,指定 schema,改变 column name

    1
    2
    >>> spark.createDataFrame(l, ['name', 'age']).collect()
    [Row(name=u'Alice', age=1)]
  3. list[dict] -> DataFrame,不需要指定schema

    1
    2
    3
    >>> d = [{'name': 'Alice', 'age': 1}]
    >>> spark.createDataFrame(d).collect()
    [Row(age=1, name=u'Alice')]
  4. rdd -> DataFrame,不指定 schema

    1
    2
    3
    >>> rdd = sc.parallelize(l) # 后面会用到
    >>> spark.createDataFrame(rdd).collect()
    [Row(_1=u'Alice', _2=1)]
  5. rdd -> DataFrame,指定schema

    1
    2
    3
    >>> df = spark.createDataFrame(rdd, ['name', 'age'])
    >>> df.collect()
    [Row(name=u'Alice', age=1)]
  6. rdd -> (Row,指定column name) -> DataFrame

    1
    2
    3
    4
    5
    6
    >>> rom pyspark.sql import Row
    >>> erson = Row('name', 'age')
    >>> erson = rdd.map(lambda r: Person(*r))
    >>> f2 = spark.createDataFrame(person)
    >>> f2.collect()
    [Row(name=u'Alice', age=1)]
  7. rdd -> DataFrame, schema使用StructType

    1
    2
    3
    4
    5
    6
    7
    >>> from pyspark.sql.types import *
    >>> schema = StructType([
    >>> StructField("name", StringType(), True),
    >>> StructField("age", IntegerType(), True)])
    >>> df3 = spark.createDataFrame(rdd, schema)
    >>> df3.collect()
    [Row(name=u'Alice', age=1)]
  8. DF -> Pandas -> DF

    1
    2
    >>> spark.createDataFrame(df.toPandas()).collect()
    [Row(name=u'Alice', age=1)]
  9. Pandas DF -> Spark DF

    1
    2
    >>> spark.createDataFrame(pandas.DataFrame([[1, 2]])).collect()
    [Row(0=1, 1=2)]
  10. rdd -> DF, schema is string, “name:type” or “type”

    1
    2
    >>> spark.createDataFrame(rdd, "a: string, b: int").collect()
    [Row(a=u'Alice', b=1)]
  11. rdd提取指定column后 -> DF, int类型

    1
    2
    3
    >>> rdd = rdd.map(lambda row: row[1])
    >>> spark.createDataFrame(rdd, "int").collect()
    [Row(value=1)]
  12. rdd 指定column,比较 -> DF, boolen类型

    1
    2
    3
    4
    >>> l = [('Alice', 1)]
    >>> rdd = sc.parallelize(l).map(lambda x: x[1]==1)
    >>> spark.createDataFrame(rdd, "boolean").collect()
    [Row(value=True)]

二. 从文件创建DDataFrame

  1. 从json文件创建
    1
    2
    3
    4
    json
    df = spark.read.format('json').load('py/test/sql/people.json')

    df = spark.read.json('py/test/sql/people.json')

三. 查看DataFrame的基本属性

  1. 查看column类型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    >>> l = [('Alice', 1)]   
    >>> rdd = sc.parallelize(l).map(lambda x: x[1])
    >>> df = spark.createDataFrame(rdd, "int")
    >>> df.dtypes
    [('value', 'int')]

    >>> df.printSchema()
    root
    |-- value: integer (nullable = true)

    >>> df._schema
    StructType(List(StructField(value,IntegerType,true)))
  2. 只查看有哪些columns

    1
    2
    >>> df.columns
    ['value']
  3. 查看DF行数

    1
    2
    >>> df.count()
    1
  4. DF 以指定 column 升序和降序(sort、sortWithinPartitions)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    >>> data = spark.createDataFrame(data=[("Alberto", 14), ("Dakota", 12),("Bill",23),("Matthew",24)],schema=['name','age'])
    >>> df.printSchema()
    root
    |-- name: string (nullable = true)
    |-- age: long (nullable = true)


    >>> df.sort(df.age.desc()).show()
    +-------+---+
    | name|age|
    +-------+---+
    |Matthew| 24|
    | Bill| 23|
    |Alberto| 14|
    | Dakota| 12|
    +-------+---+

    >>> df.sort(df.age).show() # 默认升序
    +-------+---+
    | name|age|
    +-------+---+
    | Dakota| 12|
    |Alberto| 14|
    | Bill| 23|
    |Matthew| 24|
    +-------+---+

    >>> df.sortWithinPartitions("age", ascending=False).show() #
    +-------+---+
    | name|age|
    +-------+---+
    |Alberto| 14|
    | Dakota| 12|
    | Bill| 23|
    |Matthew| 24|
    +-------+---+

四. 对空值的常用操作

空值判断

  1. None 的空值判断

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    >>> from pyspark.sql.functions import isnull, isnan
    >>> df = spark.createDataFrame([(1, None), (None, 2)], ("a", "b"))
    >>> df.show()
    +----+----+
    | a| b|
    +----+----+
    | 1|null|
    |null| 2|
    +----+----+
    >>> df.select(isnull("a").alias("r1"), isnull(df.a).alias("r2")).show()
    +-----+-----+
    | r1| r2|
    +-----+-----+
    |false|false|
    | true| true|
    +-----+-----+
  2. nan的空值判断

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    >>> df = spark.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b"))
    >>> df.show()
    +---+---+
    | a| b|
    +---+---+
    |1.0|NaN|
    |NaN|2.0|
    +---+---+

    >>> df.select(isnan("a").alias("r1"), isnan(df.a).alias("r2")).show()
    +-----+-----+
    | r1| r2|
    +-----+-----+
    |false|false|
    | true| true|
    +-----+-----+

缺失值(null)处理[删除、填充]

造数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
employees = [(1, "John", 25), (2, "Ray", 35), (3,"Mike", 24), (4, "Jane", 28),(5, "Kevin", 26),(6, "Vincent", 35)]
employees=spark.createDataFrame(employees, schema=["emp_id","name","age"])
salary=[(1,1000),(2,2000),(3,3000),(4,4000)]
salary=spark.createDataFrame(salary, schema=["emp_id","salary"])
lang=[(1,"java"),(3,"html"),(4,"hadoop"),(5,"C#")]
lang=spark.createDataFrame(lang, schema=["emp_id","lang"])

# 因为left join 会产生多个重复的列'emp_id',所以后面加了个select获取需要的列
>>> df = employees.join(salary, employees.emp_id == salary.emp_id,how='left').join(lang, employees.emp_id == lang.emp_id,how='left').select(employees.emp_id,"name","age","salary","lang")
>>> df.show()
+------+-------+---+------+------+
|emp_id| name|age|salary| lang|
+------+-------+---+------+------+
| 6|Vincent| 35| null| null|
| 5| Kevin| 26| null| C#|
| 1| John| 25| 1000| java|
| 3| Mike| 24| 3000| html|
| 2| Ray| 35| 2000| null|
| 4| Jane| 28| 4000|hadoop|
+------+-------+---+------+------+

# 3.如果两边的关联字段名相同,使用on来连接不会产生重复字段
df = employees.join(salary, on='emp_id', how='left').join(lang, on='emp_id', how='left')
df.show()
+------+-------+---+------+------+
|emp_id| name|age|salary| lang|
+------+-------+---+------+------+
| 6|Vincent| 35| null| null|
| 5| Kevin| 26| null| C#|
| 1| John| 25| 1000| java|
| 3| Mike| 24| 3000| html|
| 2| Ray| 35| 2000| null|
| 4| Jane| 28| 4000|hadoop|
+------+-------+---+------+------+

  1. 删除所有含有缺失值的行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    >>> df.dropna().show()
    #或
    >>> df.na.drop().show()
    +------+----+---+------+------+
    |emp_id|name|age|salary| lang|
    +------+----+---+------+------+
    | 1|John| 25| 1000| java|
    | 3|Mike| 24| 3000| html|
    | 4|Jane| 28| 4000|hadoop|
    +------+----+---+------+------+
  2. 删除有有2列是有缺失值的行(但是并没删除,先做个记录)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    >>> df.na.drop(thresh=2).show()
    +------+-------+---+------+------+
    |emp_id| name|age|salary| lang|
    +------+-------+---+------+------+
    | 6|Vincent| 35| null| null|
    | 5| Kevin| 26| null| C#|
    | 1| John| 25| 1000| java|
    | 3| Mike| 24| 3000| html|
    | 2| Ray| 35| 2000| null|
    | 4| Jane| 28| 4000|hadoop|
    +------+-------+---+------+------+
  3. 删除 salary 为null的行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    >>> from pyspark.sql.functions import isnull
    >>> df.filter(isnull(df.salary) == False).show()
    +------+----+---+------+------+
    |emp_id|name|age|salary| lang|
    +------+----+---+------+------+
    | 1|John| 25| 1000| java|
    | 3|Mike| 24| 3000| html|
    | 2| Ray| 35| 2000| null|
    | 4|Jane| 28| 4000|hadoop|
    +------+----+---+------+------+
  4. 删除某两列为 null的行

    1
    2
    3
    4
    5
    6
    7
    8
    df.filter(isnull(df.salary) == False).filter(isnull(df.lang) == False).show()      
    +------+----+---+------+------+
    |emp_id|name|age|salary| lang|
    +------+----+---+------+------+
    | 1|John| 25| 1000| java|
    | 3|Mike| 24| 3000| html|
    | 4|Jane| 28| 4000|hadoop|
    +------+----+---+------+------+
  5. 以该列的平均值填充有缺失值的行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    >>> import math
    >>> from pyspark.sql import functions as func
    >>> mean_salary = df.select(func.mean('salary')).collect()[0][0]
    >>> clean_data = final_data.na.fill({'salary':mean_salary})
    >>> clean_data.show()
    +------+-------+---+------+------+
    |emp_id| name|age|salary| lang|
    +------+-------+---+------+------+
    | 6|Vincent| 35| 2500| null|
    | 5| Kevin| 26| 2500| C#|
    | 1| John| 25| 1000| java|
    | 3| Mike| 24| 3000| html|
    | 2| Ray| 35| 2000| null|
    | 4| Jane| 28| 4000|hadoop|
    +------+-------+---+------+------+
  6. 不同的列用不同的值填充

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    >>> df.na.fill({'salary':mean_salary, 'lang':'unknown'}).show()
    +------+-------+---+------+-------+
    |emp_id| name|age|salary| lang|
    +------+-------+---+------+-------+
    | 6|Vincent| 35| 2500|unknown|
    | 5| Kevin| 26| 2500| C#|
    | 1| John| 25| 1000| java|
    | 3| Mike| 24| 3000| html|
    | 2| Ray| 35| 2000|unknown|
    | 4| Jane| 28| 4000| hadoop|
    +------+-------+---+------+-------+
  7. 如果col1为空则用col2填补,否则返回col1

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    >>> from pyspark.sql.functions import nanvl
    >>> df = spark.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b"))
    >>> df.show()
    +---+---+
    | a| b|
    +---+---+
    |1.0|NaN|
    |NaN|2.0|
    +---+---+
    >>> df.select(nanvl("a", "b").alias("r1"), nanvl(df.a, df.b).alias("r2")).show()
    +---+---+
    | r1| r2|
    +---+---+
    |1.0|1.0|
    |2.0|2.0|
    +---+---+

五. 对重复值的常用操作

  1. 删除重复值行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    >>> from pyspark.sql import Row                                                 
    >>>
    >>> df = sc.parallelize([
    ... Row(name='Alice', age=5, height=80),
    ... Row(name='Alice', age=5, height=80),
    ... Row(name='Alice', age=10, height=80)]).toDF()
    >>> df.show()
    +---+------+-----+
    |age|height| name|
    +---+------+-----+
    | 5| 80|Alice|
    | 5| 80|Alice|
    | 10| 80|Alice|
    +---+------+-----+

    >>> df.dropDuplicates().show()
    +---+------+-----+
    |age|height| name|
    +---+------+-----+
    | 5| 80|Alice|
    | 10| 80|Alice|
    +---+------+-----+
  2. 只要某一列有重复值,则去重

    1
    2
    3
    4
    5
    6
    >>> df.dropDuplicates(subset=['name']).show()
    +---+------+-----+
    |age|height| name|
    +---+------+-----+
    | 5| 80|Alice|
    +---+------+-----+

六. agg(self, *exprs) 表达式

  1. 计算列的最大值和平均值
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    >>> df.show()
    +------+-------+---+------+------+
    |emp_id| name|age|salary| lang|
    +------+-------+---+------+------+
    | 6|Vincent| 35| 2500| null|
    | 5| Kevin| 26| 2500| C#|
    | 1| John| 25| 1000| java|
    | 3| Mike| 24| 3000| html|
    | 2| Ray| 35| 2000| null|
    | 4| Jane| 28| 4000|hadoop|
    +------+-------+---+------+------+

    >>> df.agg({"age": "max","salary":"avg"}).collect()
    [Row(avg(salary)=2500.0, max(age)=35)]
    >>> df.agg({"age": "max","salary":"avg"}).collect()[0]
    Row(avg(salary)=2500.0, max(age)=35)
    >>> df.agg({"age": "max","salary":"avg"}).collect()[0][0]
    2500.0

使用 functions下的函数 min(col)

1
2
3
>>> from pyspark.sql import functions as F
>>> df.agg(F.min(df.age)).collect()
[Row(min(age)=24)]

  1. 统计指定列中的同一个值的数量

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    >>> authors = [['Thomas','Hardy','June 2,1840'],
    ... ['Thomas','Hardy','June 2,1840'],
    ... ['Thomas','H',None],
    ... ['Jane','Austen','16 December 1775'],
    ... ['Emily',None,None]]
    >>> df1 = spark.createDataFrame(authors,schema=["FirstName","LastName","Dob"])
    >>> df1.show()
    +---------+--------+----------------+
    |FirstName|LastName| Dob|
    +---------+--------+----------------+
    | Thomas| Hardy| June 2,1840|
    | Thomas| Hardy| June 2,1840|
    | Thomas| H| null|
    | Jane| Austen|16 December 1775|
    | Emily| null| null|
    +---------+--------+----------------+

    >>> gdf = df1.groupBy(df1.FirstName)
    >>> gdf.agg({"*": "count"}).collect()
    [Row(FirstName=u'Emily', count(1)=1), Row(FirstName=u'Jane', count(1)=1), Row(FirstName=u'Thomas', count(1)=3)]
  2. 统计同一个列中不同值的数量

    1
    2
    3
    4
    5
    6
    7
    8
    >>> from pyspark.sql import functions as F
    >>> df.agg(F.approx_count_distinct(df.age).alias('distinct_age_num')).collect()
    [Row(distinct_age_num=5)]

    or

    df.agg(F.countDistinct(df.age).alias('distinct_age_num')).collect()
    [Row(distinct_age_num=5)]
  3. 对age去重后求sum

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    >>> df.show()
    +------+-------+---+------+------+
    |emp_id| name|age|salary| lang|
    +------+-------+---+------+------+
    | 6|Vincent| 35| 2500| null|
    | 5| Kevin| 26| 2500| C#|
    | 1| John| 25| 1000| java|
    | 3| Mike| 24| 3000| html|
    | 2| Ray| 35| 2000| null|
    | 4| Jane| 28| 4000|hadoop|
    +------+-------+---+------+------+
    >>> df.agg(F.sumDistinct(df.age).alias('distinct_age_num')).collect()
    [Row(distinct_age_num=138)]

七. select(self, *cols)selectExpr(self, *expr)

  1. 获取全部

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    >>> df.select('*').show()
    +------+-------+---+------+------+
    |emp_id| name|age|salary| lang|
    +------+-------+---+------+------+
    | 6|Vincent| 35| 2500| null|
    | 5| Kevin| 26| 2500| C#|
    | 1| John| 25| 1000| java|
    | 3| Mike| 24| 3000| html|
    | 2| Ray| 35| 2000| null|
    | 4| Jane| 28| 4000|hadoop|
    +------+-------+---+------+------+
  2. 获取指定列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    >>> df.select('name', df.age).show()
    +-------+---+
    | name|age|
    +-------+---+
    |Vincent| 35|
    | Kevin| 26|
    | John| 25|
    | Mike| 24|
    | Ray| 35|
    | Jane| 28|
    +-------+---+
  3. 获取指定列,并做相应操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    >>> df.select(df.name, (df.age + 10).alias('age')).show()
    +-------+---+
    | name|age|
    +-------+---+
    |Vincent| 45|
    | Kevin| 36|
    | John| 35|
    | Mike| 34|
    | Ray| 45|
    | Jane| 38|
    +-------+---+
  4. 根据列num获取列(第一列为num=0)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    >>> df.select(df[1],df[2],df[3]).show()
    +-------+---+------+
    | name|age|salary|
    +-------+---+------+
    |Vincent| 35| 2500|
    | Kevin| 26| 2500|
    | John| 25| 1000|
    | Mike| 24| 3000|
    | Ray| 35| 2000|
    | Jane| 28| 4000|
    +-------+---+------+
  5. 对某列的数据进行比较操作,并将比较结果取出

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    >>> df.select(df.name, df.age>30).show()
    +-------+----------+
    | name|(age > 30)|
    +-------+----------+
    |Vincent| true|
    | Kevin| false|
    | John| false|
    | Mike| false|
    | Ray| true|
    | Jane| false|
    +-------+----------+
  6. selectExpr对列进行操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    >>> df.selectExpr("age * 2", "age").show()
    +---------+---+
    |(age * 2)|age|
    +---------+---+
    | 70| 35|
    | 52| 26|
    | 50| 25|
    | 48| 24|
    | 70| 35|
    | 56| 28|
    +---------+---+
  7. 获取每一行中指定列中的最大值和最小值

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    >>> df=[(1,1000,1500,10),(2,2000,2500,3000),(3,3000,2800,1),(4,4000,3300,2000)]
    >>> df=spark.createDataFrame(df, schema=["emp_id","salary1","salary2","salary3"])
    >>> df.show()
    +------+-------+-------+-------+
    |emp_id|salary1|salary2|salary3|
    +------+-------+-------+-------+
    | 1| 1000| 1500| 10|
    | 2| 2000| 2500| 3000|
    | 3| 3000| 2800| 1|
    | 4| 4000| 3300| 2000|
    +------+-------+-------+-------+

    >>> df.select(greatest('salary1','salary2','salary3').alias('greatest'),
    ... least('salary1','salary2','salary3').alias('least')).show()
    +--------+-----+
    |greatest|least|
    +--------+-----+
    | 1500| 10|
    | 3000| 2000|
    | 3000| 1|
    | 4000| 2000|
    +--------+-----+
  8. split正则切分字符串

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    >>> from pyspark.sql.functions import split
    >>> df = spark.createDataFrame([('ab12cd',)], ['s',])
    df.select(split(df.s, '[0-9]+').alias('s')).show()
    >>> df.select(split(df.s, '[0-9]+').alias('s')).show()
    +--------+
    | s|
    +--------+
    |[ab, cd]|
    +--------+

    >>> df = spark.createDataFrame([('ab|cd',)], ['s',])
    df.select(split(df.s, '\|').alias('s')).show()
    >>> df.select(split(df.s, '\|').alias('s')).show()
    +--------+
    | s|
    +--------+
    |[ab, cd]|
    +--------+
  9. 类eval操作
    传入一个操作字符串,然后转成python代码执行,就像python的eval一样。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    >>> from pyspark.sql.functions import expr
    >>> df.select(expr('length(salary3) as len')).show()
    +---+
    |len|
    +---+
    | 2|
    | 4|
    | 1|
    | 4|
    +---+

八. filter(self, condition)where(self, condition) 等价

  1. 过滤
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    >>> df.filter(df.age > 30).show()

    or

    >>> df.filter("age > 30").show()

    or

    >>> df.where(df.age > 30).show()
    +------+-------+---+------+----+
    |emp_id| name|age|salary|lang|
    +------+-------+---+------+----+
    | 6|Vincent| 35| 2500|null|
    | 2| Ray| 35| 2000|null|
    +------+-------+---+------+----+

九. when

1.符合条件的进行相应处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
>>> from pyspark.sql.functions import when
>>> df.select("*",when(df.age == 35, df.age + 1).alias("age2")).show()
+------+-------+---+------+------+----+
|emp_id| name|age|salary| lang|age2|
+------+-------+---+------+------+----+
| 6|Vincent| 35| 2500| null| 36|
| 5| Kevin| 26| 2500| C#|null|
| 1| John| 25| 1000| java|null|
| 3| Mike| 24| 3000| html|null|
| 2| Ray| 35| 2000| null| 36|
| 4| Jane| 28| 4000|hadoop|null|
+------+-------+---+------+------+----+


>>> df.select("*",when(df['age'] == 35, 1).otherwise(0).alias("age")).show()
+------+-------+---+------+------+---+
|emp_id| name|age|salary| lang|age|
+------+-------+---+------+------+---+
| 6|Vincent| 35| 2500| null| 1|
| 5| Kevin| 26| 2500| C#| 0|
| 1| John| 25| 1000| java| 0|
| 3| Mike| 24| 3000| html| 0|
| 2| Ray| 35| 2000| null| 1|
| 4| Jane| 28| 4000|hadoop| 0|
+------+-------+---+------+------+---+

十. 列操作

  1. 生成新列(合并原有列)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    >>> from pyspark.sql.functions import udf
    #创建udf自定义函数,对于简单的lambda函数不需要指定返回值类型
    >>> concat_func = udf(lambda name,age:name+'_'+str(age))
    >>> df.show()
    +------+-------+---+------+------+
    |emp_id| name|age|salary| lang|
    +------+-------+---+------+------+
    | 6|Vincent| 35| 2500| null|
    | 5| Kevin| 26| 2500| C#|
    | 1| John| 25| 1000| java|
    | 3| Mike| 24| 3000| html|
    | 2| Ray| 35| 2000| null|
    | 4| Jane| 28| 4000|hadoop|
    +------+-------+---+------+------+

    >>> df.withColumn("name_age",concat_func(df.name, df.age)).show()
    +------+-------+---+------+------+----------+
    |emp_id| name|age|salary| lang| name_age|
    +------+-------+---+------+------+----------+
    | 6|Vincent| 35| 2500| null|Vincent_35|
    | 5| Kevin| 26| 2500| C#| Kevin_26|
    | 1| John| 25| 1000| java| John_25|
    | 3| Mike| 24| 3000| html| Mike_24|
    | 2| Ray| 35| 2000| null| Ray_35|
    | 4| Jane| 28| 4000|hadoop| Jane_28|
    +------+-------+---+------+------+----------+
  2. 生成新列(根据原有列)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    >>> df.withColumn("age_incremented",df.age+1).show()
    +------+-------+---+------+------+---------------+
    |emp_id| name|age|salary| lang|age_incremented|
    +------+-------+---+------+------+---------------+
    | 6|Vincent| 35| 2500| null| 36|
    | 5| Kevin| 26| 2500| C#| 27|
    | 1| John| 25| 1000| java| 26|
    | 3| Mike| 24| 3000| html| 25|
    | 2| Ray| 35| 2000| null| 36|
    | 4| Jane| 28| 4000|hadoop| 29|
    +------+-------+---+------+------+---------------+
  3. 生成新列(lit)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    >>> from pyspark.sql.functions import lit
    >>> df.withColumn('newCol', lit(1)).show()
    +------+-------+-------+-------+------+
    |emp_id|salary1|salary2|salary3|newCol|
    +------+-------+-------+-------+------+
    | 1| 1000| 1500| 10| 1|
    | 2| 2000| 2500| 3000| 1|
    | 3| 3000| 2800| 1| 1|
    | 4| 4000| 3300| 2000| 1|
    +------+-------+-------+-------+------+

十一. sortorderBy 等价

  1. 按某个字段升序/降序

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    >>> df.show()
    +------+-------+---+------+------+
    |emp_id| name|age|salary| lang|
    +------+-------+---+------+------+
    | 6|Vincent| 35| null| null|
    | 5| Kevin| 26| null| C#|
    | 1| John| 25| 1000| java|
    | 3| Mike| 24| 3000| html|
    | 2| Ray| 35| 2000| null|
    | 4| Jane| 28| 4000|hadoop|
    +------+-------+---+------+------+

    >>> df.sort(df.age.desc()).show()
    +------+-------+---+------+------+
    |emp_id| name|age|salary| lang|
    +------+-------+---+------+------+
    | 2| Ray| 35| 2000| null|
    | 6|Vincent| 35| null| null|
    | 4| Jane| 28| 4000|hadoop|
    | 5| Kevin| 26| null| C#|
    | 1| John| 25| 1000| java|
    | 3| Mike| 24| 3000| html|
    +------+-------+---+------+------+

    >>> df.sort(df.age,ascending=False).show()
    +------+-------+---+------+------+
    |emp_id| name|age|salary| lang|
    +------+-------+---+------+------+
    | 2| Ray| 35| 2000| null|
    | 6|Vincent| 35| null| null|
    | 4| Jane| 28| 4000|hadoop|
    | 5| Kevin| 26| null| C#|
    | 1| John| 25| 1000| java|
    | 3| Mike| 24| 3000| html|
    +------+-------+---+------+------+

    >>> from pyspark.sql.functions import asc
    >>> df.sort(asc("age")).show()
    +------+-------+---+------+------+
    |emp_id| name|age|salary| lang|
    +------+-------+---+------+------+
    | 3| Mike| 24| 3000| html|
    | 1| John| 25| 1000| java|
    | 5| Kevin| 26| null| C#|
    | 4| Jane| 28| 4000|hadoop|
    | 6|Vincent| 35| null| null|
    | 2| Ray| 35| 2000| null|
    +------+-------+---+------+------+
  2. 按两个字段进行排序操作(默认asc)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    >>> from pyspark.sql.functions import desc
    >>> df.sort(desc("age"), desc("name")).show()
    +------+-------+---+------+------+
    |emp_id| name|age|salary| lang|
    +------+-------+---+------+------+
    | 6|Vincent| 35| null| null|
    | 2| Ray| 35| 2000| null|
    | 4| Jane| 28| 4000|hadoop|
    | 5| Kevin| 26| null| C#|
    | 1| John| 25| 1000| java|
    | 3| Mike| 24| 3000| html|
    +------+-------+---+------+------+

    >>> df.orderBy(["age", "name"], ascending=[1, 0]).show()
    +------+-------+---+------+------+
    |emp_id| name|age|salary| lang|
    +------+-------+---+------+------+
    | 3| Mike| 24| 3000| html|
    | 1| John| 25| 1000| java|
    | 5| Kevin| 26| null| C#|
    | 4| Jane| 28| 4000|hadoop|
    | 6|Vincent| 35| null| null|
    | 2| Ray| 35| 2000| null|
    +------+-------+---+------+------+

十二. groupby

  1. 计算所有行的每列的平均值(只取数字类型)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    >>> df.show()
    +------+-------+---+------+------+
    |emp_id| name|age|salary| lang|
    +------+-------+---+------+------+
    | 6|Vincent| 35| null| null|
    | 5| Kevin| 26| null| C#|
    | 1| John| 25| 1000| java|
    | 3| Mike| 24| 3000| html|
    | 2| Ray| 35| 2000| null|
    | 4| Jane| 28| 4000|hadoop|
    +------+-------+---+------+------+

    >>> df.groupBy().avg().collect()
    [Row(avg(emp_id)=3.5, avg(age)=28.833333333333332, avg(salary)=2500.0)]
  2. 分组求最大,最小,平均

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    >>> authors = [['Thomas','25','June 2,1840'],
    ... ['Thomas','30','June 2,1840'],
    ... ['Thomas','14',None],
    ... ['Jane','56','16 December 1775'],
    ... ['Emily','21',None]]
    >>> df = spark.createDataFrame(authors,["name","age","Dob"])
    >>> df.show()
    +------+---+----------------+
    | name|age| Dob|
    +------+---+----------------+
    |Thomas| 25| June 2,1840|
    |Thomas| 30| June 2,1840|
    |Thomas| 14| null|
    | Jane| 56|16 December 1775|
    | Emily| 21| null|
    +------+---+----------------+

    >>> df.groupBy('name').agg({'age': 'mean'}).show()
    +------+--------+
    | name|avg(age)|
    +------+--------+
    | Emily| 21.0|
    | Jane| 56.0|
    |Thomas| 23.0|
    +------+--------+

    >>> df.groupBy('name').agg({'age': 'max'}).show()
    +------+--------+
    | name|max(age)|
    +------+--------+
    | Emily| 21|
    | Jane| 56|
    |Thomas| 30|
    +------+--------+

    >>> df.groupBy('name').agg({'age': 'min'}).show()
    +------+--------+
    | name|min(age)|
    +------+--------+
    | Emily| 21|
    | Jane| 56|
    |Thomas| 14|
    +------+--------+
  3. 以指定列分组统计数量

    1
    2
    3
    4
    >>> df.groupBy(['name', df.age]).count().collect()
    [Row(name=u'Emily', age=u'21', count=1), Row(name=u'Thomas', age=u'30', count=1), Row(name=u'Thomas', age=u'25', count=1), Row(name=u'Thomas', age=u'14', count=1), Row(name=u'Jane', age=u'56', count=1)]
    >>> df.groupBy(['name']).count().collect()
    [Row(name=u'Emily', count=1), Row(name=u'Jane', count=1), Row(name=u'Thomas', count=3)]

十三. DataFrame的合并(union)、分割等操作函数

造数据

1
2
3
4
5
6
listA=[('1','lisi','25'),('2','zhangs','20'),('3','liw','18'),('4','wangw','10')]
df1=spark.createDataFrame(listA,['id','name','age'])
listA=[('4','wangw','10'),('5','zhangss','1')]
df2=spark.createDataFrame(listA,['id','name','age'])
listA=[('7','ggg','18')]
df3=spark.createDataFrame(listA,['id','name','age'])

  1. union、intersect、subtract等需要处理的df的列一样
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    >>> df1.show()
    +---+------+---+
    | id| name|age|
    +---+------+---+
    | 1| lisi| 25|
    | 2|zhangs| 20|
    | 3| liw| 18|
    | 4| wangw| 10|
    +---+------+---+

    >>> df2.show()
    +---+-------+---+
    | id| name|age|
    +---+-------+---+
    | 4| wangw| 10|
    | 5|zhangss| 1|
    +---+-------+---+

    >>> df1.union(df2).show()
    +---+-------+---+
    | id| name|age|
    +---+-------+---+
    | 1| lisi| 25|
    | 2| zhangs| 20|
    | 3| liw| 18|
    | 4| wangw| 10|
    | 4| wangw| 10|
    | 5|zhangss| 1|
    +---+-------+---+
    >>> df1.intersect(df2).show()
    +---+-----+---+
    | id| name|age|
    +---+-----+---+
    | 4|wangw| 10|
    +---+-----+---+

    >>> df1.subtract(df2).show()
    +---+------+---+
    | id| name|age|
    +---+------+---+
    | 1| lisi| 25|
    | 3| liw| 18|
    | 2|zhangs| 20|
    +---+------+---+

十四. join操作

造数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
>>> employees = [(1, "John", 25), (2, "Ray", 35), (3,"Mike", 24), (4, "Jane", 28),(5, "Kevin", 26),(6, "Vincent", 35)]
>>> employees=spark.createDataFrame(employees, schema=["emp_id","name","age"])
>>> salary=[(1,1000),(2,2000),(3,3000),(4,4000)]
>>> salary=spark.createDataFrame(salary, schema=["emp_id","salary"])
>>> lang=[(1,"java"),(3,"html"),(4,"hadoop"),(5,"C#")]
>>> lang=spark.createDataFrame(lang, schema=["emp_id","lang"])

>>> employees.show()
+------+-------+---+
|emp_id| name|age|
+------+-------+---+
| 1| John| 25|
| 2| Ray| 35|
| 3| Mike| 24|
| 4| Jane| 28|
| 5| Kevin| 26|
| 6|Vincent| 35|
+------+-------+---+

>>> salary.show()
+------+------+
|emp_id|salary|
+------+------+
| 1| 1000|
| 2| 2000|
| 3| 3000|
| 4| 4000|
+------+------+
>>> lang.show()
+------+------+
|emp_id| lang|
+------+------+
| 1| java|
| 3| html|
| 4|hadoop|
| 5| C#|
+------+------+

1.(inner)join 内连接,join默认是内连接,最终结果会存在重复列名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
>>> employees.join(salary, employees.emp_id == salary.emp_id).show()
+------+----+---+------+------+
|emp_id|name|age|emp_id|salary|
+------+----+---+------+------+
| 1|John| 25| 1| 1000|
| 3|Mike| 24| 3| 3000|
| 2| Ray| 35| 2| 2000|
| 4|Jane| 28| 4| 4000|
+------+----+---+------+------+

去除重复列名: 1.select获取指定列; 2. 使用on (需要连接字段名相同)
>>> employees.join(salary, on="emp_id").show()
or
>>> employees.join(salary, "emp_id").show()
+------+----+---+------+
|emp_id|name|age|salary|
+------+----+---+------+
| 1|John| 25| 1000|
| 3|Mike| 24| 3000|
| 2| Ray| 35| 2000|
| 4|Jane| 28| 4000|
+------+----+---+------+

  1. outer join全外连接

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    >>> employees.join(salary, employees.emp_id == salary.emp_id, 'outer').show()
    +------+-------+---+------+------+
    |emp_id| name|age|emp_id|salary|
    +------+-------+---+------+------+
    | 6|Vincent| 35| null| null|
    | 5| Kevin| 26| null| null|
    | 1| John| 25| 1| 1000|
    | 3| Mike| 24| 3| 3000|
    | 2| Ray| 35| 2| 2000|
    | 4| Jane| 28| 4| 4000|
    +------+-------+---+------+------+

    cond = [employees.emp_id == salary.emp_id]
    >>> employees.join(salary, cond, 'outer').show()
    ...
  2. left 同理

  3. 根据多列进行内连接

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    >>> lang2.show()
    +------+------+-----+---+
    |emp_id| lang| name|age|
    +------+------+-----+---+
    | 5| C#|Kevin| 26|
    | 1| java| John| 25|
    | 3| html| Mike| 24|
    | 4|hadoop| Jane| 28|
    +------+------+-----+---+

    >>> lang.show()
    +------+------+
    |emp_id| lang|
    +------+------+
    | 1| java|
    | 3| html|
    | 4|hadoop|
    | 5| C#|
    +------+------+

    >>> lang1=lang.withColumn('hobby',lit(1))
    >>> lang1.show()
    +------+------+-----+
    |emp_id| lang|hobby|
    +------+------+-----+
    | 1| java| 1|
    | 3| html| 1|
    | 4|hadoop| 1|
    | 5| C#| 1|
    +------+------+-----+

    >>> lang1.join(lang2,['emp_id','lang']).show()
    +------+------+-----+-----+---+
    |emp_id| lang|hobby| name|age|
    +------+------+-----+-----+---+
    | 5| C#| 1|Kevin| 26|
    | 1| java| 1| John| 25|
    | 3| html| 1| Mike| 24|
    | 4|hadoop| 1| Jane| 28|
    +------+------+-----+-----+---+