返回介绍

数学基础

统计学习

深度学习

工具

Scala

三、DataFrame 创建

发布于 2023-07-17 23:38:23 字数 20377 浏览 0 评论 0 收藏 0

  1. 在一个SparkSession 中,应用程序可以从一个已经存在的RDDHIVE表、或者spark数据源中创建一个DataFrame

3.1 从列表创建

  1. 未指定列名:

    
    
    xxxxxxxxxx
    l = [('Alice', 1)] spark_session.createDataFrame(l).collect()

    结果为:

    
    
    xxxxxxxxxx
    [Row(_1=u'Alice', _2=1)] #自动分配列名
  2. 指定列名:

    
    
    xxxxxxxxxx
    l = [('Alice', 1)] spark_session.createDataFrame(l, ['name', 'age']).collect()

    结果为:

    
    
    xxxxxxxxxx
    [Row(name=u'Alice', age=1)]
  3. 通过字典指定列名:

    
    
    xxxxxxxxxx
    d = [{'name': 'Alice', 'age': 1}] spark_session.createDataFrame(d).collect()

    结果为:

    
    
    xxxxxxxxxx
    [Row(age=1, name=u'Alice')]

3.2 从 RDD 创建

  1. 未指定列名:

    
    
    xxxxxxxxxx
    rdd = sc.parallelize([('Alice', 1)]) spark_session.createDataFrame(rdd).collect()

    结果为:

    
    
    xxxxxxxxxx
    [Row(_1=u'Alice', _2=1)] #自动分配列名
  2. 指定列名:

    
    
    xxxxxxxxxx
    rdd = sc.parallelize([('Alice', 1)]) spark_session.createDataFrame(rdd, ['name', 'age']).collect()

    结果为:

    
    
    xxxxxxxxxx
    [Row(name=u'Alice', age=1)]
  3. 通过Row 来创建:

    
    
    xxxxxxxxxx
    from pyspark.sql import Row Person = Row('name', 'age') rdd = sc.parallelize([('Alice', 1)]).map(lambda r: Person(*r)) spark_session.createDataFrame(rdd, ['name', 'age']).collect()

    结果为:

    
    
    xxxxxxxxxx
    [Row(name=u'Alice', age=1)]
  4. 指定schema

    
    
    xxxxxxxxxx
    from pyspark.sql.types import * schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True)]) rdd = sc.parallelize([('Alice', 1)]) spark_session.createDataFrame(rdd, schema).collect()

    结果为:

    
    
    xxxxxxxxxx
    [Row(name=u'Alice', age=1)]
  5. 通过字符串指定schema

    
    
    xxxxxxxxxx
    rdd = sc.parallelize([('Alice', 1)]) spark_session.createDataFrame(rdd, "a: string, b: int").collect()

    结果为:

    
    
    xxxxxxxxxx
    [Row(name=u'Alice', age=1)]
    • 如果只有一列,则字符串schema 为:

      
      
      xxxxxxxxxx
      rdd = sc.parallelize([1]) spark_session.createDataFrame(rdd, "int").collect()

      结果为:

      
      
      xxxxxxxxxx
      [Row(value=1)]

3.3 从 pandas.DataFrame 创建

  1. 使用方式:

    
    
    xxxxxxxxxx
    df = pd.DataFrame({'a':[1,3,5],'b':[2,4,6]}) spark_session.createDataFrame(df).collect()

    结果为:

    
    
    xxxxxxxxxx
    [Row(a=1, b=2), Row(a=3, b=4), Row(a=5, b=6)]

3.4 从数据源创建

  1. 从数据源创建的接口是DataFrameReader

    
    
    xxxxxxxxxx
    reader = spark_session.read
  2. 另外,也可以不使用API ,直接将文件加载到DataFrame 并进行查询:

    
    
    xxxxxxxxxx
    df = spark_session.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

3.4.1 通用加载

  1. 设置数据格式:.format(source)

    • 返回self
    
    
    xxxxxxxxxx
    df = spark_session.read.format('json').load('python/test_support/sql/people.json')
  2. 设置数据schema.schema(schema)

    • 返回self
    • 某些数据源可以从输入数据中推断schema。一旦手动指定了schema,则不再需要推断。
  3. 加载:.load(path=None, format=None, schema=None, **options)

    • 参数:

      • path:一个字符串,或者字符串的列表。指出了文件的路径
      • format:指出了文件类型。默认为parquet(除非另有配置spark.sql.sources.default
      • schema:输入数据的schema,一个StructType 类型实例。
      • options:其他的参数
    • 返回值:一个DataFrame 实例

    • 示例:

      
      
      xxxxxxxxxx
      spark_session.read.format('json').load(['python/test_support/sql/people.json', 'python/test_support/sql/people1.json'])

3.4.2 专用加载

  1. .csv():加载csv 文件,返回一个DataFrame 实例

    
    
    xxxxxxxxxx
    .csv(path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None)
  2. .jdbc():加载数据库中的表

    
    
    xxxxxxxxxx
    .jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)
    • 参数:

      • url:一个JDBC URL,格式为:jdbc:subprotocol:subname
      • table:表名
      • column:列名。该列为整数列,用于分区。如果该参数被设置,那么numPartitions、lowerBound、upperBound 将用于分区从而生成where 表达式来拆分该列。
      • lowerBoundcolumn的最小值,用于决定分区的步长
      • upperBoundcolumn的最大值(不包含),用于决定分区的步长
      • numPartitions:分区的数量
      • predicates:一系列的表达式,用于where中。每一个表达式定义了DataFrame 的一个分区
      • properties:一个字典,用于定义JDBC 连接参数。通常至少为:{ 'user' : 'SYSTEM', 'password' : 'mypassword'}
    • 返回:一个DataFrame 实例

  3. .json():加载json 文件,返回一个DataFrame 实例

    
    
    xxxxxxxxxx
    .json(path, schema=None, primitivesAsString=None, prefersDecimal=None, allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, multiLine=None)

    示例:

    
    
    xxxxxxxxxx
    spark_session.read.json('python/test_support/sql/people.json') # 或者 rdd = sc.textFile('python/test_support/sql/people.json') spark_session.read.json(rdd)
  4. .orc():加载ORC文件,返回一个DataFrame 实例

    
    
    xxxxxxxxxx
    .orc(path)

    示例:

    
    
    xxxxxxxxxx
    spark_session.read.orc('python/test_support/sql/orc_partitioned')
  5. .parquet():加载Parquet文件,返回一个DataFrame 实例

    .parquet(*paths)
    

    示例:

    
    
    xxxxxxxxxx
    spark_session.read.parquet('python/test_support/sql/parquet_partitioned')
  6. .table(): 从table 中创建一个DataFrame

    
    
    xxxxxxxxxx
    .table(tableName)

    示例:

    
    
    xxxxxxxxxx
    df = spark_session.read.parquet('python/test_support/sql/parquet_partitioned') df.createOrReplaceTempView('tmpTable') spark_session.read.table('tmpTable')
  7. .text():从文本中创建一个DataFrame

    
    
    xxxxxxxxxx
    .text(paths)

    它不同于.csv(),这里的DataFrame 只有一列,每行文本都是作为一个字符串。

    示例:

    
    
    xxxxxxxxxx
    spark_session.read.text('python/test_support/sql/text-test.txt').collect() #结果为:[Row(value=u'hello'), Row(value=u'this')]

3.5 从 Hive 表创建

  1. spark SQL 还支持读取和写入存储在Apache Hive 中的数据。但是由于Hive 具有大量依赖关系,因此这些依赖关系不包含在默认spark 版本中。

    • 如果在类路径中找到Hive 依赖项,则Spark 将会自动加载它们
    • 这些Hive 的依赖关系也必须存在于所有工作节点上
  2. 配置:将hive-site.xmlcore-site.html(用于安全配置)、hdfs-site.xml(用户HDFS 配置) 文件放在conf/ 目录中完成配置。

  3. 当使用Hive 时,必须使用启用Hive 支持的SparkSession 对象(enableHiveSupport

    • 如果未部署Hive,则开启Hive 支持不会报错
  4. hive-site.xml 未配置时,上下文会自动在当前目录中创建metastore_db,并创建由spark.sql.warehouse.dir 指定的目录

  5. 访问示例:

    
    
    xxxxxxxxxx
    from pyspark.sql import SparkSession spark_sess = SparkSession \ .builder \ .appName("Python Spark SQL Hive integration example") \ .config("spark.sql.warehouse.dir", '/home/xxx/yyy/') \ .enableHiveSupport() \ .getOrCreate() spark_sess.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") spark_sess.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") spark.sql("SELECT * FROM src").show()
  6. 创建Hive 表时,需要定义如何向/从文件系统读写数据,即:输入格式、输出格式。还需要定义该表的数据的序列化与反序列化。

    可以通过在OPTIONS 选项中指定这些属性:

    
    
    xxxxxxxxxx
    spark_sess.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive OPTIONS(fileFormat 'parquet')")

    可用的选项有:

    • fileFormat:文件格式。目前支持6种文件格式:'sequencefile'、'rcfile'、'orc'、'parquet'、'textfile'、'avro'

    • inputFormat,outputFormat:这两个选项将相应的InputFormatOutputFormat 类的名称指定为字符串文字,如'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'

      • 这两个选项必须成对出现
      • 如果已经制定了fileFormat,则无法指定它们
    • serde:该选项指定了serde 类的名称

      • 如果给定的fileFormat 已经包含了serde 信息(如何序列化、反序列化的信息),则不要指定该选项
      • 目前的sequencefile、textfile、rcfile 不包含serde 信息,因此可以使用该选项
    • fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim:这些选项只能与textfile 文件格式一起使用,它们定义了如何将分隔的文件读入行。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文