返回介绍

数学基础

统计学习

深度学习

工具

Scala

九、functions

发布于 2023-07-17 23:38:23 字数 33952 浏览 0 评论 0 收藏 0

  1. pyspark.sql.functions 模块提供了一些内建的函数,它们用于创建Column

    • 它们通常多有公共的参数 col,表示列名或者Column
    • 它们的返回结果通常都是Column

9.1 数学函数

这里的col 都是数值列。

  1. abs(col):计算绝对值

  2. acos(col):计算acos

  3. cos(col):计算cos

  4. cosh(col):计算cosh

  5. asin(col):计算asin

  6. atan(col):计算atan

  7. atan2(col1,col2):计算从直角坐标 $ MathJax-Element-4 $ 到极坐标 $ MathJax-Element-5 $ 的角度 $ MathJax-Element-6 $

  8. bround(col,scale=0):计算四舍五入的结果。如果scale>=0,则使用HALF_EVEN 舍入模式;如果scale<0,则将其舍入到整数部分。

  9. cbrt(col):计算立方根

  10. ceil(col):计算ceiling

  11. floor(col):计算floor

  12. corr(col1,col2):计算两列的皮尔逊相关系数

  13. covar_pop(col1,col2):计算两列的总体协方差 (公式中的除数是 N )

  14. covar_samp(col1,col2):计算两列的样本协方差 (公式中的除数是 N-1 )

  15. degrees(col):将弧度制转换为角度制

  16. radians(col):将角度制转换为弧度制

  17. exp(col):计算指数: $ MathJax-Element-7 $

  18. expml(col):计算指数减一: $ MathJax-Element-8 $

  19. fractorial(col):计算阶乘

  20. pow(col1,col2) :返回幂级数 $ MathJax-Element-9 $

  21. hash(*cols):计算指定的一些列的hash code,返回一个整数列

    • 参数:

      • cols:一组列名或者Columns
  22. hypot(col1,col2):计算 $ MathJax-Element-10 $ (没有中间产出的上溢出、下溢出),返回一个数值列

  23. log(arg1,arg2=None):计算对数。其中第一个参数为底数。如果只有一个参数,则使用自然底数。

    • 参数:

      • arg1:如果有两个参数,则它给出了底数。否则就是对它求自然底数。
      • arg2:如果有两个参数,则对它求对数。
  24. log10(col):计算基于10的对数

  25. log1p(col):计算 $ MathJax-Element-11 $

  26. log2(col):计算基于2的对数

  27. rand(seed=None):从均匀分布U~[0.0,1.0] 生成一个独立同分布(i.i.d) 的随机列

    • 参数:

      • seed:一个整数,表示随机数种子。
  28. randn(seed=None):从标准正态分布N~(0.0,1.0) 生成一个独立同分布(i.i.d) 的随机列

    • 参数:

      • seed:一个整数,表示随机数种子。
  29. rint(col):返回最接近参数值的整数的double 形式。

  30. round(col,scale=0):返回指定参数的四舍五入形式。

    如果scale>=0,则使用HALF_UP 的舍入模式;否则直接取参数的整数部分。

  31. signum(col):计算正负号

  32. sin(col):计算sin

  33. sinh(col):计算 sinh

  34. sqrt(col):计算平方根

  35. tan(col):计算tan

  36. tanh(col):计算tanh

  37. toDegreees(col):废弃。使用degrees() 代替

  38. toRadias(col):废弃,使用radians() 代替

9.2 字符串函数

  1. ascii(col):返回一个数值列,它是旧列的字符串中的首个字母的ascii 值。其中col 必须是字符串列。

  2. base64(col):返回一个字符串列,它是旧列(二进制值)的BASE64编码得到的字符串。其中col 必须是二进制列。

  3. bin(col):返回一个字符串列,它是旧列(二进制值)的字符串表示(如二进制1101 的字符串表示为'1101' )其中col 必须是二进制列。

  4. cov(col,fromBase,toBase):返回一个字符串列,它是一个数字的字符串表达从fromBase 转换到toBase

    • 参数:

      • col:一个字符串列,它是数字的表达。如1028。它的基数由fromBase 给出
      • fromBase:一个整数,col 中字符串的数值的基数。
      • toBase:一个整数,要转换的数值的基数。
    • 示例:

      
      
      xxxxxxxxxx
      df = spark_session.createDataFrame([("010101",)], ['n']) df.select(conv(df.n, 2, 16).alias('hex')).collect() # 结果:[Row(hex=u'15')]
  5. concat(*cols):创建一个新列,它是指定列的字符串拼接的结果(没有分隔符)。

    • 参数

      • cols:列名字符串列表,或者Column 列表。要求这些列具有同样的数据类型
  6. concat_ws(sep,*cols):创建一个新列,它是指定列的字符串使用指定的分隔符拼接的结果。

    • 参数

      • sep:一个字符串,表示分隔符
      • cols:列名字符串列表,或者Column 列表。要求这些列具有同样的数据类型
  7. decode(col,charset):从二进制列根据指定字符集来解码成字符串。

    • 参数:

      • col:一个字符串或者Column,为二进制列
      • charset:一个字符串,表示字符集。
  8. encode(col,charset):把字符串编码成二进制格式。

    • 参数:

      • col:一个字符串或者Column,为字符串列
      • charset:一个字符串,表示字符集。
  9. format_number(col,d):格式化数值成字符串,根据HALF_EVEN 来四舍五入成d 位的小数。

    • 参数:

      • col:一个字符串或者Column,为数值列
      • d:一个整数,格式化成表示d 位小数。
  10. format_string(format,*cols):返回print 风格的格式化字符串。

    • 参数:

      • formatprint 风格的格式化字符串。如%s%d
      • cols:一组列名或者Columns,用于填充format
  11. hex(col):计算指定列的十六进制值(以字符串表示)。

    • 参数:

      • col:一个字符串或者Column,为字符串列、二进制列、或者整数列
  12. initcap(col):将句子中每个单词的首字母大写。

    • 参数:

      • col:一个字符串或者Column,为字符串列
  13. input_file_name():为当前的spark task 的文件名创建一个字符串列

  14. instr(str,substr):给出substrstr 的首次出现的位置。位置不是从0开始,而是从1开始的。

    如果substr 不在str 中,则返回 0 。

    如果str 或者 substrnull,则返回null

    • 参数:

      • str:一个字符串或者Column,为字符串列
      • substr:一个字符串
  15. locate(substr,str,pos=1):给出substrstr 的首次出现的位置(在pos 之后)。位置不是从0开始,而是从1开始的。

    如果substr 不在str 中,则返回 0 。

    如果str 或者 substrnull,则返回null

    • 参数:

      • str:一个字符串或者Column,为字符串列
      • substr:一个字符串
      • pos::起始位置(基于0开始)
  16. length(col):计算字符串或者字节的长度。

    • 参数:

      • col:一个字符串或者Column,为字符串列,或者为字节列。
  17. levenshtein(left,right):计算两个字符串之间的Levenshtein 距离。

    Levenshtein 距离:刻画两个字符串之间的差异度。它是从一个字符串修改到另一个字符串时,其中编辑单个字符串(修改、插入、删除)所需要的最少次数。

  18. lower(col):转换字符串到小写

  19. lpad(col,len,pad):对字符串,向左填充。

    • 参数:

      • col:一个字符串或者Column,为字符串列
      • len:预期填充后的字符串长度
      • pad:填充字符串
  20. ltrim(col):裁剪字符串的左侧空格

  21. md5(col):计算指定列的MD5 值(一个32字符的十六进制字符串)

  22. regexp_extract(str,pattern,idx):通过正则表达式抽取字符串中指定的子串 。

    • 参数:

      • str:一个字符串或者Column,为字符串列,表示被抽取的字符串。
      • pattern: 一个Java 正则表达式子串。
      • idx:表示抽取第几个匹配的结果。
    • 返回值:如果未匹配到,则返回空字符串。

  23. .regexp_replace(str,pattern,replacement): 通过正则表达式替换字符串中指定的子串。

    • 参数:

      • str:一个字符串或者Column,为字符串列,表示被替换的字符串。
      • pattern: 一个Java 正则表达式子串。
      • replacement:表示替换的子串
    • 返回值:如果未匹配到,则返回空字符串。

  24. repeat(col,n):重复一个字符串列n次,结果返回一个新的字符串列。

    • 参数:

      • col:一个字符串或者Column,为字符串列
      • n:一个整数,表示重复次数
  25. reverse(col):翻转一个字符串列,结果返回一个新的字符串列

  26. rpad(col,len,pad):向右填充字符串到指定长度。

    • 参数:

      • col:一个字符串或者Column,为字符串列
      • len: 指定的长度
      • pad:填充字符串
  27. rtrim(col):剔除字符串右侧的空格符

  28. sha1(col): 以16进制字符串的形式返回SHA-1 的结果

  29. sha2(col,numBites):以16进制字符串的形式返回SHA-2 的结果。

    numBites 指定了结果的位数(可以为 244,256,384,512,或者0表示256

  30. soundex(col):返回字符串的SoundEx 编码

  31. split(str,pattern): 利用正则表达式拆分字符串。产生一个array

    • 参数:

      • str:一个字符串或者Column,为字符串列
      • pattern:一个字符串,表示正则表达式
  32. substring(str,pos,len):抽取子串。

    • 参数:

      • str:一个字符串或者Column,为字符串列,或者字节串列
      • pos:抽取的起始位置
      • len:抽取的子串长度
    • 返回值:如果str 表示字符串列,则返回的是子字符串。如果str 是字节串列,则返回的是字节子串。

  33. substring_index(str,delim,count):抽取子串

    • 参数:

      • str: 一个字符串或者Column,为字符串列
      • delim:一个字符串,表示分隔符
      • count:指定子串的下标。 如果为正数,则从左开始,遇到第countdelim 时,返回其左侧的内容; 如果为负数,则从右开始,遇到第abs(count)delim 时,返回其右侧的内容;
    • 示例:

      
      
      xxxxxxxxxx
      df = spark.createDataFrame([('a.b.c.d',)], ['s']) df.select(substring_index(df.s, '.', 2).alias('s')).collect() # [Row(s=u'a.b')] df.select(substring_index(df.s, '.', -3).alias('s')).collect() # [Row(s=u'b.c.d')]
  34. translate(srcCol,matching,replace):将srcCol 中指定的字符替换成另外的字符。

    • 参数:

      • srcCol: 一个字符串或者Column,为字符串列
      • matching: 一个字符串。只要srcCol 中的字符串,有任何字符匹配了它,则执行替换
      • replace:它一一对应于matching 中要替换的字符
    • 示例:

      
      
      xxxxxxxxxx
      df = spark.createDataFrame([('translate',)], ['a']) df.select(translate('a', "rnlt", "123") .alias('r')).collect() # [Row(r=u'1a2s3ae')] # r->1, n->2,l->3, t->空字符
  35. trim(col):剔除字符串两侧的空格符

  36. unbase64(col): 对字符串列执行BASE64 编码,并且返回一个二进制列

  37. unhex(col):对字符串列执行hex 的逆运算。 给定一个十进制数字字符串,将其逆转换为十六进制数字字符串。

  38. upper(col):将字符串列转换为大写格式

9.3 日期函数

  1. add_months(start, months):增加月份

    • 参数:

      • start:列名或者Column 表达式,指定起始时间
      • months:指定增加的月份
    • 示例:

      
      
      xxxxxxxxxx
      df = spark_session.createDataFrame([('2015-04-08',)], ['d']) df.select(add_months(df.d, 1).alias('d')) # 结果为:[Row(d=datetime.date(2015, 5, 8))]
  2. current_data():返回当前日期作为一列

  3. current_timestamp():返回当前的时间戳作为一列

  4. date_add(start,days):增加天数

    • 参数:

      • start:列名或者Column 表达式,指定起始时间
      • days:指定增加的天数
  5. date_sub(start,days):减去天数

    • 参数:

      • start:列名或者Column 表达式,指定起始时间
      • days:指定减去的天数
  6. date_diff(end,start):返回两个日期之间的天数差值

    • 参数:

      • end:列名或者Column 表达式,指定结束时间。为date/timestamp/string
      • start:列名或者Column 表达式,指定起始时间。为date/timestamp/string
  7. date_format(date,format):转换date/timestamp/string 到指定格式的字符串。

    • 参数:

      • date:一个date/timestamp/string 列的列名或者Column
      • format:一个字符串,指定了日期的格式化形式。支持java.text.SimpleDateFormat 的所有格式。
  8. dayofmonth(col):返回日期是当月的第几天(一个整数)。其中coldate/timestamp/string

  9. dayofyear(col):返回日期是当年的第几天(一个整数)。其中coldate/timestamp/string

  10. from_unixtime(timestamp, format='yyyy-MM-dd HH:mm:ss'):转换unix 时间戳到指定格式的字符串。

    • 参数:

      • timestamp:时间戳的列
      • format:时间格式化字符串
  11. from_utc_timestamp(timestamp, tz):转换unix 时间戳到指定时区的日期。

  12. hour(col):从指定时间中抽取小时,返回一个整数列

    • 参数:

      • col:一个字符串或者Column。是表示时间的字符串列,或者datetime
  13. minute(col):从指定时间中抽取分钟,返回一个整数列

    • 参数:

      • col:一个字符串或者Column。是表示时间的字符串列,或者datetime
  14. second(col):从指定的日期中抽取秒,返回一个整数列。

    • 参数:

      • col:一个字符串或者Column。是表示时间的字符串列,或者datetime
  15. month(col):从指定时间中抽取月份,返回一个整数列

    • 参数:

      • col:一个字符串或者Column。是表示时间的字符串列,或者datetime
  16. quarter(col):从指定时间中抽取季度,返回一个整数列

    • 参数:

      • col:一个字符串或者Column。是表示时间的字符串列,或者datetime
  17. last_day(date):返回指定日期的当月最后一天(一个datetime.date

    • 参数:

      • date:一个字符串或者Column。是表示时间的字符串列,或者datetime
  18. months_between(date1,date2):返回date1date2 之间的月份(一个浮点数)。

    也就是date1-date2 的天数的月份数量。如果为正数,表明date1 > date2

    • 参数:

      • date1:一个字符串或者Column。是表示时间的字符串列,或者datetime
      • date2:一个字符串或者Column。是表示时间的字符串列,或者datetime
  19. next_day(date,dayOfWeek):返回指定天数之后的、且匹配dayOfWeek 的那一天。

    • 参数:

      • date1:一个字符串或者Column。是表示时间的字符串列,或者datetime
      • dayOfWeek:指定星期几。是大小写敏感的,可以为:'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'
  20. to_date(col,format=None):转换pyspark.sql.types.StringType 或者pyspark.sql.types.TimestampTypepyspark.pysql.types.DateType

    • 参数:

      • col:一个字符串或者Column。是表示时间的字符串列
      • format:指定的格式。默认为yyyy-MM-dd
  21. to_timestamp(col,format=None):将StringType,TimestampType 转换为DataType

    • 参数:

      • col:一个字符串或者Column。是表示时间的字符串列
      • format:指定的格式。默认为yyyy-MM-dd HH:mm:ss
  22. to_utc_timestamp(timestamp,tz):根据给定的时区,将StringType,TimestampType 转换为DataType

    • 参数:

      • col:一个字符串或者Column。是表示时间的字符串列
      • tz:一个字符串,表示时区
  23. trunc(date,format):裁剪日期到指定的格式 。

    • 参数:

      • date:一个字符串或者Column。是表示时间的字符串列
      • format:指定的格式。如: 'year','YYYY','yy','month','mon','mm','d'
  24. unix_timestamp(timestamp=None,format='yyyy-MM-dd HH:mm:ss') :给定一个unix timestamp(单位为秒),将其转换为指定格式的字符串。使用默认的时区和默认的locale

    如果转换失败,返回null

    如果timestamp=None,则返回当前的timestamp

    • 参数:

      • timestamp:一个unix 时间戳列。
      • format:指定转换的格式
  25. weekofyear(col): 返回给定时间是当年的第几周。返回一个整数。

  26. year(col):从日期中抽取年份,返回一个整数。

9.4 聚合函数

  1. count(col):计算每一组的元素的个数。

  2. avg(col):计算指定列的均值

  3. approx_count_distinct(col, rsd=None):统计指定列有多少个distinct

  4. countDistinct(col,*cols):计算一列或者一组列中的distinct value 的数量。

  5. collect_list(col):返回指定列的元素组成的列表(不会去重)

  6. collect_set(col):返回指定列的元素组成的集合(去重)

  7. first(col,ignorenulls=False):返回组内的第一个元素。

    如果ignorenulls=True,则忽略null 值,直到第一个非null 值。如果都是null,则返回null

    如果ignorenulls=False,则返回组内第一个元素(不管是不是null)

  8. last(col,ignorenulls=False):返回组内的最后一个元素。

    如果ignorenulls=True,则忽略null 值,直到最后一个非null 值。如果都是null,则返回null

    如果ignorenulls=False,则返回组内最后一个元素(不管是不是null)

  9. grouping(col):判断group by list 中的指定列是否被聚合。如果被聚合则返回1,否则返回 0。

  10. grouping_id(*cols):返回grouping 的级别。

    cols 必须严格匹配grouping columns,或者为空(表示所有的grouping columns)

  11. kurtosis(col):返回一组元素的峰度

  12. max(col):返回组内的最大值。

  13. mean(col):返回组内的均值

  14. min(col):返回组内的最小值

  15. skewness(col): 返回组内的偏度

  16. stddev(col):返回组内的样本标准差(分母除以 N-1

  17. stddev_pop(col):返回组内的总体标准差(分母除以 N

  18. stddev_samp(col): 返回组内的标准差,与stddev 相同

  19. sum(col):返回组内的和

  20. sumDistinct(col):返回组内distinct 值的和

  21. var_pop(col):返回组内的总体方差。 (分母除以 N

  22. var_samp(col):返回组内的样本方差 。(分母除以 N-1

  23. variance(col):返回组内的总体方差,与var_pop 相同

9.5 逻辑与按位函数

  1. .bitwiseNot(col) :返回一个字符串列,它是旧列的比特级的取反。
  2. isnan(col):返回指定的列是否是NaN
  3. isnull(col):返回指定的列是否为null
  4. shiftLeft(col,numBites):按位左移指定的比特位数。
  5. shiftRight(col,numBites):按位右移指定的比特位数。
  6. shiftRightUnsigned(col,numBites):按位右移指定的比特位数。但是无符号移动。

9.6 排序、拷贝

  1. asc(col):返回一个升序排列的Column
  2. desc(col):返回一个降序排列的Column
  3. col(col):返回值指定列组成的Column
  4. column(col):返回值指定列组成的Column

9.7 窗口函数

  1. window(timeColumn,windowDuration,slideDuration=None,startTime=None) :将rows 划分到一个或者多个窗口中(通过timestamp 列)

    • 参数:

      • timeColumn:一个时间列,用于划分window。它必须是pyspark.sql.types.TimestampType
      • windowDuration: 表示时间窗口间隔的字符串。如 '1 second','1 day 12 hours','2 minutes' 。单位字符串可以为'week','day','hour','minute','second','millisecond','microsecond'
      • slideDuration: 表示窗口滑动的间隔,即:下一个窗口移动多少。如果未提供,则窗口为 tumbling windows。 单位字符串可以为'week','day','hour','minute','second','millisecond','microsecond'
      • startTime:起始时间。它是1970-01-01 00:00:00 以来的相对偏移时刻。如,你需要在每个小时的15 分钟开启小时窗口,则它为15 minutes12:15-13:15,13:15-14:15,...
    • 返回值:返回一个称作windowstruct,它包含start,end(一个半开半闭区间)

  2. cume_dist():返回一个窗口中的累计分布概率。

  3. dense_rank():返回窗口内的排名。(1,2,... 表示排名为1,2,...

    它和rank() 的区别在于:dense_rank() 的排名没有跳跃(比如有3个排名为1,那么下一个排名是2,而不是下一个排名为4)

  4. rank():返回窗口内的排名。(1,2,... 表示排名为1,2,...)。

    如有3个排名为1,则下一个排名是 4。

  5. percent_rank():返回窗口的相对排名(如:百分比)

  6. lag(col,count=1,default=None):返回当前行之前偏移行的值。如果当前行之前的行数小于count,则返回default 值。

    • 参数:

      • col:一个字符串或者Column。开窗的列
      • count:偏移行
      • default:默认值
  7. lead(col,count=1,default=None):返回当前行之后偏移行的值。如果当前行之后的行数小于count,则返回default 值。

    • 参数:

      • col:一个字符串或者Column。开窗的列
      • count:偏移行
      • default:默认值
  8. ntile(n):返回有序窗口分区中的ntile group id (从 1 到 n

  9. row_number(): 返回一个序列,从 1 开始,到窗口的长度。

9.8 其它

  1. array(*cols):创新一个新的array 列。

    • 参数:

      • cols:列名字符串列表,或者Column 列表。要求这些列具有同样的数据类型
    • 示例:

      
      
      xxxxxxxxxx
      df.select(array('age', 'age').alias("arr")) df.select(array([df.age, df.age]).alias("arr"))
  2. array_contains(col, value):创建一个新列,指示value是否在array 中(由col 给定)

    其中col 必须是array 类型。而value 是一个值,或者一个Column 或者列名。

    • 判断逻辑:

      • 如果arraynull,则返回null
      • 如果value 位于 array 中,则返回True
      • 如果value 不在 array 中,则返回False
    • 示例:

      
      
      xxxxxxxxxx
      df = spark_session.createDataFrame([(["a", "b", "c"],), ([],)], ['data']) df.select(array_contains(df.data, "a"))
  3. create_map(*cols):创建一个map 列。

    • 参数:

      • cols:列名字符串列表,或者Column 列表。这些列组成了键值对。如(key1,value1,key2,value2,...)
    • 示例:

      
      
      xxxxxxxxxx
      df.select(create_map('name', 'age').alias("map")).collect() #[Row(map={u'Alice': 2}), Row(map={u'Bob': 5})]
  4. broadcast(df):标记df 这个Dataframe 足够小,从而应用于broadcast join

    • 参数:

      • df:一个 Dataframe 对象
  1. coalesce(*cols):返回第一个非null 的列组成的Column。如果都为null,则返回null

    • 参数:

      • cols:列名字符串列表,或者Column 列表。
  2. crc32(col):计算二进制列的CRC32 校验值。要求col 是二进制列。

  3. explode(col):将一个array 或者 map 列拆成多行。要求col 是一个array 或者map 列。

    示例:

    
    
    xxxxxxxxxx
    eDF = spark_session.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})]) eDF.select(explode(eDF.intlist).alias("anInt")).collect() # 结果为:[Row(anInt=1), Row(anInt=2), Row(anInt=3)] eDF.select(explode(eDF.mapfield).alias("key", "value")).show() #结果为: # +---+-----+ # |key|value| # +---+-----+ # | a| b| # +---+-----+
  4. posexplode(col): 对指定array 或者map 中的每个元素,依据每个位置返回新的一行。

    要求col 是一个array 或者map 列。

    示例:

    
    
    xxxxxxxxxx
    eDF = spark_session.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})]) eDF.select(posexplode(eDF.intlist)).collect() #结果为:[Row(pos=0, col=1), Row(pos=1, col=2), Row(pos=2, col=3)]
  5. expr(str):计算表达式。

    • 参数:

      • str:一个表达式。如length(name)
  6. from_json(col,schema,options={}):解析一个包含JSON 字符串的列。如果遇到无法解析的字符串,则返回null

    • 参数:

      • col:一个字符串列,字符串是json 格式
      • schema:一个StructType(表示解析一个元素),或者StructTypeArrayType(表示解析一组元素)
      • options:用于控制解析过程。
    • 示例:

      
      
      xxxxxxxxxx
      from pyspark.sql.types import * schema = StructType([StructField("a", IntegerType())]) df = spark_session.createDataFrame([(1, '{"a": 1}')], ("key", "value")) df.select(from_json(df.value, schema).alias("json")).collect() #结果为:[Row(json=Row(a=1))]
  7. get_json_object(col,path):从json 字符串中提取指定的字段。如果json 字符串无效,则返回null.

    • 参数:

      • col:包含json 格式的字符串的列。
      • pathjson 的字段的路径。
    • 示例:

      
      
      xxxxxxxxxx
      data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')] df = spark_session.createDataFrame(data, ("key", "jstring")) df.select(df.key, get_json_object(df.jstring, '$.f1').alias("c0"), get_json_object(df.jstring, '$.f2').alias("c1") ).collect() # 结果为:[Row(key=u'1', c0=u'value1', c1=u'value2'), Row(key=u'2', c0=u'value12', c1=None)]
  8. greatest(*cols):返回指定的一堆列中的最大值。要求至少包含2列。

    它会跳过null 值。如果都是null 值,则返回null

  9. least(*cols):返回指定的一堆列中的最小值。要求至少包含2列。

    它会跳过null 值。如果都是null 值,则返回null

  10. json_tuple(col,*fields):从json 列中抽取字段组成新列(抽取n 个字段,则生成n 列)

    • 参数:

      • col:一个json 字符串列
      • fields:一组字符串,给出了json 中待抽取的字段
  11. lit(col):创建一个字面量值的列

  12. monotonically_increasing_id():创建一个单调递增的id 列(64位整数)。

    它可以确保结果是单调递增的,并且是unique的,但是不保证是连续的。

    它隐含两个假设:

    • 假设dataframe 分区数量少于1 billion
    • 假设每个分区的记录数量少于8 billion
  13. nanvl(col1,col2):如果col1 不是NaN,则返回col1;否则返回col2

    要求col1col2 都是浮点列(DoubleType 或者 FloatType

  14. size(col):计算array/map 列的长度(元素个数)。

  15. sort_array(col,asc=True): 对array 列中的array 进行排序(排序的方式是自然的顺序)

    • 参数:

      • col:一个字符串或者Column, 指定一个array
      • asc: 如果为True,则是升序;否则是降序
  16. spark_partition_id():返回一个partition ID

    该方法产生的结果依赖于数据划分和任务调度,因此是未确定结果的。

  17. struct(*cols):创建一个新的struct 列。

    • 参数:

      • cols:一个字符串列表(指定了列名),或者一个Column 列表
    • 示例:

      
      
      xxxxxxxxxx
      df.select(struct('age', 'name').alias("struct")).collect() # [Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]
  18. to_json(col,options={}):将包含 StructType 或者ArrytypeStructType 转换为json 字符串。如果遇到不支持的类型,则抛出异常。

    • 参数:

      • col:一个字符串或者Column,表示待转换的列
      • options:转换选项。它支持和json datasource 同样的选项
  19. udf(f=None,returnType=StringType):根据用户定义函数(UDF) 来创建一列。

    • 参数:

      • f:一个python 函数,它接受一个参数
      • returnType:一个pyspqrk.sql.types.DataType 类型,表示udf 的返回类型
    • 示例:

      
      
      xxxxxxxxxx
      from pyspark.sql.types import IntegerType slen = udf(lambda s: len(s), IntegerType()) df.select(slen("name").alias("slen_name"))
  20. when(condition,value): 对一系列条件求值,返回其中匹配的哪个结果。

    如果Column.otherwise() 未被调用,则当未匹配时,返回None;如果Column.otherwise() 被调用,则当未匹配时,返回otherwise() 的结果。

    • 参数:

      • condition:一个布尔列
      • value:一个字面量值,或者一个Column
    • 示例:

      
      
      xxxxxxxxxx
      df.select(when(df['age'] == 2, 3).otherwise(4).alias("age")).collect() # [Row(age=3), Row(age=4)]

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文