返回介绍

数学基础

统计学习

深度学习

工具

Scala

二、SparkSession

发布于 2023-07-17 23:38:23 字数 5981 浏览 0 评论 0 收藏 0

  1. spark sql 中所有功能的入口点是SparkSession 类。它可以用于创建DataFrame、注册DataFrametable、在table 上执行SQL、缓存table、读写文件等等。

  2. 要创建一个SparkSession,仅仅使用SparkSession.builder 即可:

    
    from pyspark.sql import SparkSession
    spark_session = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
  3. Builder 用于创建SparkSession,它的方法有(这些方法都返回self ):

    • .appName(name):给程序设定一个名字,用于在Spark web UI 中展示。如果未指定,则spark 会随机生成一个。

      • name:一个字符串,表示程序的名字
    • .config(key=None,value=None,conf=None):配置程序。这里设定的配置会直接传递给SparkConfSparkSession各自的配置。

      • key:一个字符串,表示配置名
      • value:对应配置的值
      • conf:一个SparkConf 实例

      有两种设置方式:

      • 通过键值对设置:

        
        
        xxxxxxxxxx
        SparkSession.builder.config("spark.some.config.option", "some-value")
      • 通过已有的SparkConf 设置:

        
        
        xxxxxxxxxx
        SparkSession.builder.config(conf=SparkConf())
    • .enableHiveSupport():开启Hive 支持。(spark 2.0 的新接口)

    • .master(master):设置spark master URL。如:

      • master=local:表示单机本地运行
      • master=local[4]:表示单机本地4核运行
      • master=spark://master:7077:表示在一个spark standalone cluster 上运行
    • .getOrCreate():返回一个已有的SparkSession 实例;如果没有则基于当前builder 的配置,创建一个新的SparkSession 实例

      • 该方法首先检测是否有一个有效的全局默认SparkSession 实例。如果有,则返回它;如果没有,则创建一个作为全局默认SparkSession实例,并返回它
      • 如果已有一个有效的全局默认SparkSession 实例,则当前builder的配置将应用到该实例上

2.1 属性

  1. .builder = <pyspark.sql.session.Builder object at 0x7f51f134a110>:一个Builder实例

  2. .catalog:一个接口。用户通过它来create、drop、alter、query底层的数据库、table 以及 function

    • 可以通过SparkSession.catalog.cacheTable('tableName'), 来缓存表;通过SparkSession.catalog.uncacheTable('tableName') 来从缓存中删除该表。
  3. .confspark 的运行时配置接口。通过它,你可以获取、设置spark、hadoop 的配置。

  4. .read:返回一个DataFrameReader,用于从外部存储系统中读取数据并返回DataFrame

  5. .readStream:返回一个DataStreamReader,用于将输入数据流视作一个DataFrame 来读取

  6. .sparkContext:返回底层的SparkContext

  7. .streams:返回一个StreamingQueryManager对象,它管理当前上下文的所有活动的StreamingQuery

  8. .udf:返回一个UDFRegistration,用于UDF 注册

  9. .version:返回当前应用的spark 版本

2.2 方法

  1. .createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True):从RDD 、一个列表、或者pandas.DataFrame 中创建一个DataFrame

    • 参数:

      • data:输入数据。可以为一个RDD、一个列表、或者一个pandas.DataFrame

      • schema:给出了DataFrame 的结构化信息。可以为:

        • 一个字符串的列表:给出了列名信息。此时每一列数据的类型从data 中推断
        • None:此时要求data 是一个RDD,且元素类型为Row、namedtuple、dict 之一。此时结构化信息从data 中推断(推断列名、列类型)
        • pyspqrk.sql.types.StructType:此时直接指定了每一列数据的类型。
        • pyspark.sql.types.DataType 或者datatype string:此时直接指定了一列数据的类型,会自动封装成pyspqrk.sql.types.StructType(只有一列)。此时要求指定的类型与data 匹配(否则抛出异常)
      • samplingRatio:如果需要推断数据类型,则它指定了需要多少比例的行记录来执行推断。如果为None,则只使用第一行来推断。

      • verifySchema:如果为True,则根据schema 检验每一行数据

    • 返回值:一个DataFrame实例

  2. .newSession():返回一个新的SparkSession实例,它拥有独立的SQLConfregistered temporary views and UDFs,但是共享同样的SparkContext以及table cache

  3. .range(start,end=None,step=1,numPartitions=None):创建一个DataFrame,它只有一列。该列的列名为id,类型为pyspark.sql.types.LongType,数值为区间[start,end),间隔为step(即:list(range(start,end,step)) )

  4. .sql(sqlQuery):查询SQL 并以DataFrame 的形式返回查询结果

  5. .stop():停止底层的SparkContext

  6. .table(tableName):以DataFrame的形式返回指定的table

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文