返回介绍

数学基础

统计学习

深度学习

工具

Scala

五、DataFrame

发布于 2023-07-17 23:38:23 字数 21724 浏览 0 评论 0 收藏 0

  1. 一个DataFrame 实例代表了基于命名列的分布式数据集。

  2. 为了访问DataFrame 的列,有两种方式:

    • 通过属性的方式:df.key
    • 通过字典的方式:df[key] 。推荐用这种方法,因为它更直观。

    它并不支持pandas.DataFrame 中其他的索引,以及各种切片方式

5.1 属性

  1. .columns:以列表的形式返回所有的列名
  2. .dtypes:以列表的形式返回所有的列的名字和数据类型。形式为:[(col_name1,col_type1),...]
  3. .isStreaming:如果数据集的数据源包含一个或者多个数据流,则返回True
  4. .na:返回一个DataFrameNaFunctions 对象,用于处理缺失值。
  5. .rdd: 返回DataFrame 底层的RDD(元素类型为Row
  6. .schema:返回DataFrameschema
  7. .stat:返回DataFrameStatFunctions 对象,用于统计
  8. .storageLevel:返回当前的缓存级别
  9. .write:返回一个DataFrameWriter对象,它是no-streaming DataFrame 的外部存储接口
  10. .writeStream:返回一个DataStreamWriter 对象,它是streaming DataFrame 的外部存储接口

5.2 方法

5.2.1 转换操作

  1. 聚合操作:

    • .agg(*exprs):在整个DataFrame 开展聚合操作(是df.groupBy.agg() 的快捷方式)

      示例:

      
      
      xxxxxxxxxx
      df.agg({"age": "max"}).collect() #在 agg 列上聚合 # 结果为:[Row(max(age)=5)] # 另一种方式: from pyspark.sql import functions as F df.agg(F.max(df.age)).collect()
    • .filter(condition):对行进行过滤。

      • 它是where() 的别名

      • 参数:

        • condition:一个types.BooleanTypeColumn,或者一个字符串形式的SQL 的表达式
      • 示例:

        
        
        xxxxxxxxxx
        df.filter(df.age > 3).collect() df.filter("age > 3").collect() df.where("age = 2").collect()
  2. 分组:

    • .cube(*cols):根据当前DataFrame 的指定列,创建一个多维的cube,从而方便我们之后的聚合过程。

      • 参数:

        • cols:指定的列名或者Column的列表
      • 返回值:一个GroupedData 对象

  • .groupBy(*cols):通过指定的列来将DataFrame 分组,从而方便我们之后的聚合过程。

    • 参数:

      • cols:指定的列名或者Column的列表
    • 返回值:一个GroupedData 对象

    • 它是groupby的别名

  • .rollup(*cols):创建一个多维的rollup,从而方便我们之后的聚合过程。

    • 参数:

      • cols:指定的列名或者Column的列表
    • 返回值:一个GroupedData 对象

  1. 排序:

    • .orderBy(*cols, **kwargs):返回一个新的DataFrame,它根据旧的DataFrame 指定列排序

      • 参数:

        • cols:一个列名或者Column 的列表,指定了排序列

        • ascending:一个布尔值,或者一个布尔值列表。指定了升序还是降序排序

          • 如果是列表,则必须和cols 长度相同
    • .sort(*cols, **kwargs):返回一个新的DataFrame,它根据旧的DataFrame 指定列排序

      • 参数:

        • cols:一个列名或者Column 的列表,指定了排序列

        • ascending:一个布尔值,或者一个布尔值列表。指定了升序还是降序排序

          • 如果是列表,则必须和cols 长度相同
      • 示例:

        ​x
        from pyspark.sql.functions import *
        df.sort(df.age.desc())
        df.sort("age", ascending=False)
        df.sort(asc("age"))
        ​
        df.orderBy(df.age.desc())
        df.orderBy("age", ascending=False)
        df.orderBy(asc("age"))
    • .sortWithinPartitions(*cols, **kwargs):返回一个新的DataFrame,它根据旧的DataFrame 指定列在每个分区进行排序

      • 参数:

        • cols:一个列名或者Column 的列表,指定了排序列

        • ascending:一个布尔值,或者一个布尔值列表。指定了升序还是降序排序

          • 如果是列表,则必须和cols 长度相同
  2. 调整分区:

    • .coalesce(numPartitions):返回一个新的DataFrame,拥有指定的numPartitions 分区。

      • 只能缩小分区数量,而无法扩张分区数量。如果numPartitions 比当前的分区数量大,则新的DataFrame 的分区数与旧DataFrame 相同

      • 它的效果是:不会混洗数据

      • 参数:

        • numPartitions:目标分区数量
    • .repartition(numPartitions, *cols):返回一个新的DataFrame,拥有指定的numPartitions 分区。

      • 结果DataFrame 是通过hash 来分区
      • 它可以增加分区数量,也可以缩小分区数量
  3. 集合操作:

    • .crossJoin(other):返回一个新的DataFrame,它是输入的两个DataFrame 的笛卡儿积

      可以理解为 [row1,row2],其中 row1 来自于第一个DataFramerow2 来自于第二个DataFrame

      • 参数:

        • other:另一个DataFrame 对象
    • .intersect(other):返回两个DataFrame 的行的交集

      • 参数:

        • other:另一个DataFrame 对象
    • .join(other,on=None,how=None):返回两个DataFramejoin

      • 参数:

        • other:另一个DataFrame 对象

        • on:指定了在哪些列上执行对齐。可以为字符串或者Column(指定单个列)、也可以为字符串列表或者Column 列表(指定多个列)

          注意:要求两个DataFrame 都存在这些列

        • how:指定join 的方式,默认为'inner'。可以为: innercrossouterfullfull_outerleftleft_outerrightright_outerleft_semileft_anti

    • .subtract(other):返回一个新的DataFrame,它的行由位于self 中、但是不在other 中的Row 组成。

      • 参数:

        • other:另一个DataFrame 对象
    • .union(other): 返回两个DataFrame的行的并集(它并不会去重)

      • 它是unionAll 的别名

      • 参数:

        • other:另一个DataFrame 对象
  4. 统计:

    • .crosstab(col1, col2):统计两列的成对频率。要求每一列的distinct 值数量少于 $ MathJax-Element-1 $ 个。最多返回 $ MathJax-Element-2 $ 对频率。

      • 它是DataFrameStatFunctions.crosstab() 的别名

      • 结果的第一列的列名为,col1_col2,值就是第一列的元素值。后面的列的列名就是第二列元素值,值就是对应的频率。

      • 参数:

        • col1,col2:列名字符串(或者Column
      • 示例:

        
        
        xxxxxxxxxx
        df =pd.DataFrame({'a':[1,3,5],'b':[2,4,6]}) s_df = spark_session.createDataFrame(df) s_df.crosstab('a','b').collect() #结果: [Row(a_b='5', 2=0, 4=0, 6=1), Row(a_b='1', 2=1, 4=0, 6=0), Row(a_b='3', 2=0, 4=1, 6=0)]
    • .describe(*cols):计算指定的数值列、字符串列的统计值。

      • 统计结果包括:count、mean、stddev、min、max

      • 该函数仅仅用于探索数据规律

      • 参数:

        • cols:列名或者多个列名字符串(或者Column)。如果未传入任何列名,则计算所有的数值列、字符串列
    • .freqItems(cols,support=None):寻找指定列中频繁出现的值(可能有误报)

      • 它是DataFrameStatFunctions.freqItems() 的别名

      • 参数:

        • cols:字符串的列表或者元组,指定了待考察的列
        • support:指定所谓的频繁的标准(默认是 1%)。该数值必须大于 $ MathJax-Element-3 $
  5. 移除数据:

    • .distinct():返回一个新的DataFrame,它保留了旧DataFrame 中的distinct 行。

      即:根据行来去重

    • .drop(*cols):返回一个新的DataFrame,它剔除了旧DataFrame 中的指定列。

      • 参数:

        • cols:列名字符串(或者Column)。如果它在旧DataFrame 中不存在,也不做任何操作(也不报错)
    • .dropDuplicates(subset=None):返回一个新的DataFrame,它剔除了旧DataFrame 中的重复行。

      它与.distinct() 区别在于:它仅仅考虑指定的列来判断是否重复行。

      • 参数:

        • subset:列名集合(或者Column的集合)。如果为None,则考虑所有的列。
      • .drop_duplicates.dropDuplicates 的别名

    • .dropna(how='any', thresh=None, subset=None):返回一个新的DataFrame,它剔除了旧DataFrame 中的null行。

      • 它是DataFrameNaFunctions.drop() 的别名

      • 参数:

        • how:指定如何判断null 行的标准。'all':所有字段都是na,则是空行;'any':任何字段存在na,则是空行。
        • thresh:一个整数。当一行中,非null 的字段数量小于thresh 时,认为是空行。如果该参数设置,则不考虑how
        • subset:列名集合,给出了要考察的列。如果为None,则考察所有列。
    • .limit(num):返回一个新的DataFrame,它只有旧DataFrame 中的num行。

  6. 采样、拆分:

    • .randomSplit(weights, seed=None):返回一组新的DataFrame,它是旧DataFrame 的随机拆分

      • 参数:

        • weights:一个double的列表。它给出了每个结果DataFrame 的相对大小。如果列表的数值之和不等于 1.0,则它将被归一化为 1.0
        • seed:随机数种子
      • 示例:

        
        
        xxxxxxxxxx
        splits = df.randomSplit([1.0, 2.0], 24) splits[0].count()
    • .sample(withReplacement, fraction, seed=None):返回一个新的DataFrame,它是旧DataFrame 的采样

      • 参数:

        • withReplacement:如果为True,则可以重复采样;否则是无放回采样

        • fractions:新的DataFrame 的期望大小(占旧DataFrame的比例)。spark 并不保证结果刚好满足这个比例(只是一个期望值)

          • 如果withReplacement=True:则表示每个元素期望被选择的次数
          • 如果withReplacement=False:则表示每个元素期望被选择的概率
        • seed:随机数生成器的种子

    • .sampleBy(col, fractions, seed=None):返回一个新的DataFrame,它是旧DataFrame 的采样

      它执行的是无放回的分层采样。分层由col 列指定。

      • 参数:

        • col:列名或者Column,它给出了分层的依据
        • fractions:一个字典,给出了每个分层抽样的比例。如果某层未指定,则其比例视作 0
      • 示例:

        
        
        xxxxxxxxxx
        sampled = df.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0) # df['key'] 这一列作为分层依据,0 抽取 10%, 1 抽取 20%
  7. 替换:

    • .replace(to_replace, value=None, subset=None):返回一组新的DataFrame,它是旧DataFrame 的数值替代结果

      • 它是DataFrameNaFunctions.replace() 的别名

      • 当替换时,value 将被类型转换到目标列

      • 参数:

        • to_replace:可以为布尔、整数、浮点数、字符串、列表、字典,给出了被替代的值。

          • 如果是字典,则给出了每一列要被替代的值
        • value:一个整数、浮点数、字符串、列表。给出了替代值。

        • subset:列名的列表。指定要执行替代的列。

    • .fillna(value, subset=None):返回一个新的DataFrame,它替换了旧DataFrame 中的null值。

      • 它是DataFrameNaFunctions.fill()的别名

      • 参数:

        • value:一个整数、浮点数、字符串、或者字典,用于替换null 值。如果是个字典,则忽略subset,字典的键就是列名,指定了该列的null值被替换的值。
        • subset:列名集合,给出了要被替换的列
  8. 选取数据:

    • .select(*cols):执行一个表达式,将其结果返回为一个DataFrame

      • 参数:

        • cols:一个列名的列表,或者Column 表达式。如果列名为*,则扩张到所有的列名
      • 示例:

        
        
        xxxxxxxxxx
        df.select('*') df.select('name', 'age') df.select(df.name, (df.age + 10).alias('age'))
    • .selectExpr(*expr):执行一个SQL 表达式,将其结果返回为一个DataFrame

      • 参数:

        • expr:一组SQL 的字符串描述
      • 示例:

        
        
        xxxxxxxxxx
        df.selectExpr("age * 2", "abs(age)")
    • .toDF(*cols):选取指定的列组成一个新的DataFrame

      • 参数:

        • cols:列名字符串的列表
    • .toJSON(use_unicode=True):返回一个新的DataFrame,它将旧的DataFrame 转换为RDD(元素为字符串),其中每一行转换为json 字符串。

  9. 列操作:

    • .withColumn(colName, col):返回一个新的DataFrame,它将旧的DataFrame 增加一列(或者替换现有的列)

      • 参数:

        • colName:一个列名,表示新增的列(如果是已有的列名,则是替换的列)
        • col:一个Column 表达式,表示新的列
      • 示例:

        
        
        xxxxxxxxxx
        df.withColumn('age2', df.age + 2)
    • .withColumnRenamed(existing, new):返回一个新的DataFrame,它将旧的DataFrame 的列重命名

      • 参数:

        • existing:一个字符串,表示现有的列的列名
        • col:一个字符串,表示新的列名

5.2.2 行动操作

  1. 查看数据:

    • .collect():以Row 的列表的形式返回所有的数据

    • .first():返回第一行(一个Row对象)

    • .head(n=None):返回前面的n

      • 参数:

        • n:返回行的数量。默认为1
      • 返回值:

        • 如果返回1行,则是一个Row 对象
        • 如果返回多行,则是一个Row 的列表
    • .show(n=20, truncate=True):在终端中打印前 n 行。

      • 它并不返回结果,而是print 结果

      • 参数:

        • n:打印的行数
        • truncate:如果为True,则超过20个字符的字符串被截断。如果为一个数字,则长度超过它的字符串将被截断。
    • .take(num):以Row 的列表的形式返回开始的num 行数据。

      • 参数:

        • num:返回行的数量
    • .toLocalIterator():返回一个迭代器,对它迭代的结果就是DataFrame的每一行数据(Row 对象)

  2. 统计:

    • .corr(col1, col2, method=None):计算两列的相关系数,返回一个浮点数。当前仅支持皮尔逊相关系数

      • DataFrame.corr()DataFrameStatFunctions.corr()的别名

      • 参数:

        • col,col2:为列的名字字符串(或者Column)。
        • method:当前只支持'pearson'
    • .cov(col1,col2):计算两列的协方差。

      • DataFrame.cov()DataFrameStatFunctions.cov()的别名

      • 参数:

        • col,col2:为列的名字字符串(或者Column
    • .count():返回当前DataFrame 有多少行

  3. 遍历:

    • .foreach(f):对DataFrame 中的每一行应用f

      • 它是df.rdd.foreach() 的快捷方式
    • .foreachPartition(f):对DataFrame 的每个分区应用f

      • 它是df.rdd.foreachPartition() 的快捷方式

      • 示例:

        
        
        xxxxxxxxxx
        def f(person): print(person.name) df.foreach(f) ​ def f(people): for person in people: print(person.name) df.foreachPartition(f)
    • .toPandas():将DataFrame 作为pandas.DataFrame 返回

      • 只有当数据较小,可以在驱动器程序中放得下时,才可以用该方法

5.2.3 其它方法

  1. 缓存:

    • .cache():使用默认的storage level 缓存DataFrame(缓存级别为:MEMORY_AND_DISK

    • .persist(storageLevel=StorageLevel(True, True, False, False, 1)):缓存DataFrame

      • 参数:

        • storageLevel:缓存级别。默认为MEMORY_AND_DISK
    • .unpersist(blocking=False):标记该DataFrame 为未缓存的,并且从内存和磁盘冲移除它的缓存块。

  2. .isLocal():如果collect()take() 方法能本地运行(不需要任何executor 节点),则返回True。否则返回False

  3. .printSchema():打印DataFrameschema

  4. .createTempView(name):创建一个临时视图,name 为视图名字。

    临时视图是session 级别的,会随着session 的消失而消失。

    • 如果指定的临时视图已存在,则抛出TempTableAlreadyExistsException 异常。

    • 参数:

      • name:视图名字
    • 示例:

      
      
      xxxxxxxxxx
      df.createTempView("people") df2 = spark_session.sql("select * from people")
  5. .createOrReplaceTempView(name):创建一个临时视图,name 为视图名字。如果该视图已存在,则替换它。

    • 参数:

      • name:视图名字
  6. .createGlobalTempView(name):创建一个全局临时视图,name 为视图名字

    spark sql 中的临时视图是session 级别的,会随着session 的消失而消失。如果希望一个临时视图跨session 而存在,则可以建立一个全局临时视图。

    • 如果指定的全局临时视图已存在,则抛出TempTableAlreadyExistsException 异常。

    • 全局临时视图存在于系统数据库global_temp 中,必须加上库名取引用它

    • 参数:

      • name:视图名字
    • 示例:

      
      
      xxxxxxxxxx
      df.createGlobalTempView("people") spark_session.sql("SELECT * FROM global_temp.people").show()
  7. .createOrReplaceGlobalTempView(name):创建一个全局临时视图,name 为视图名字。如果该视图已存在,则替换它。

    • 参数:

      • name:视图名字
  8. .registerTempTable(name):创建一个临时表,name 为表的名字。

    spark 2.0 中被废弃,推荐使用createOrReplaceTempView

  9. .explain(extended=False):打印logical planphysical plan,用于调试模式

    • 参数:

      • extended:如果为False,则仅仅打印physical plan

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文