Pyspark:UDF中的时区转换

发布于 2025-02-13 20:39:43 字数 2457 浏览 0 评论 0原文

假设我们有一个pyspark dataframe,看起来像下面的

分区min_datemax_date
A01/01/202001/01/2021
B01/06/202101/06/2022

我想添加一列date_list,其中包含min_datemax_date(包含)给定(pandas)频率。

经过一番调查,我选择了UDF是实现这一目标的最佳方法。

def create_date_range_column_factory(frequency: str):
    """Returns a udf that generates a list of dates between min and max with a given frequency"""
    def create_date_range_column(min_date: datetime.datetime, max_date: datetime.datetime) -> List[datetime.datetime]:
        # Using to to_pydatetime() as returning pandas timestamp gives an error.
        return [pdt.to_pydatetime() for pdt in pd.date_range(min_date, max_date, freq=frequency)]

    return sf.udf(create_date_range_column, st.ArrayType(st.TimestampType()))

# Create function that generates a list with month start frequency
date_range_udf = create_date_range_column_factory("M")

sdf = sdf.withColumn("date_list", date_range_udf(sf.col("min_date"), sf.col("max_date")))

df = sdf.toPandas()

# Just to get an easy to read output
output_list = [str(x) for x in df['date_list'][0]]

output_list看起来像:

['2020-01-31 01:00:00', '2020-02-29 01:00:00', '2020-03-31 01:00:00', '2020-04-30 01:00:00', '2020-05-31 01:00:00', '2020-06-30 01:00:00', '2020-07-31 01:00:00', '2020-08-31 01:00:00', '2020-09-30 01:00:00', '2020-10-31 01:00:00', '2020-11-30 01:00:00', '2020-12-31 01:00:00']

显然,这不是我所期望的。我已经尝试设置tz = pytz.utc in pd.date_range ,但是输出使我感到困惑:

['2020-01-31 02:00:00', '2020-02-29 02:00:00', '2020-03-31 03:00:00', '2020-04-30 03:00:00', '2020-05-31 03:00:00', '2020-06-30 03:00:00', '2020-07-31 03:00:00', '2020-08-31 03:00:00', '2020-09-30 03:00:00', '2020-10-31 02:00:00', '2020-11-30 02:00:00', '2020-12-31 02:00:00']

我已经设置spark.sql.session.timezone UTC

我正在使用Spark 3.1.2和Python 3.9。

我还尝试了什么

  • 使用TZ Aware时间戳传递Pandas DataFrame,并从那里
  • 传递字符串并在Spark

问题

中创建Spark数据框通过UDF回到Pyspark Timestamptepe,但是我无法弄清楚我在做什么。任何想法/投入都将不胜感激。

显然,如果有人有更好的方法来实现这一目标(使用UDF),这也受到欢迎。

Suppose we have a PySpark dataframe that looks like below

partitionmin_datemax_date
A01/01/202001/01/2021
B01/06/202101/06/2022

I want to add a column, say date_list, that contains a list of dates between the min_date and max_date (inclusive) for a given (pandas) frequency.

After some investigation I opted that a UDF would be the best way to achieve this.

def create_date_range_column_factory(frequency: str):
    """Returns a udf that generates a list of dates between min and max with a given frequency"""
    def create_date_range_column(min_date: datetime.datetime, max_date: datetime.datetime) -> List[datetime.datetime]:
        # Using to to_pydatetime() as returning pandas timestamp gives an error.
        return [pdt.to_pydatetime() for pdt in pd.date_range(min_date, max_date, freq=frequency)]

    return sf.udf(create_date_range_column, st.ArrayType(st.TimestampType()))

# Create function that generates a list with month start frequency
date_range_udf = create_date_range_column_factory("M")

sdf = sdf.withColumn("date_list", date_range_udf(sf.col("min_date"), sf.col("max_date")))

df = sdf.toPandas()

# Just to get an easy to read output
output_list = [str(x) for x in df['date_list'][0]]

output_list then looks like:

['2020-01-31 01:00:00', '2020-02-29 01:00:00', '2020-03-31 01:00:00', '2020-04-30 01:00:00', '2020-05-31 01:00:00', '2020-06-30 01:00:00', '2020-07-31 01:00:00', '2020-08-31 01:00:00', '2020-09-30 01:00:00', '2020-10-31 01:00:00', '2020-11-30 01:00:00', '2020-12-31 01:00:00']

Obviously, this is not what I expected. I've tried setting tz=pytz.utc in pd.date_range but that output puzzles me:

['2020-01-31 02:00:00', '2020-02-29 02:00:00', '2020-03-31 03:00:00', '2020-04-30 03:00:00', '2020-05-31 03:00:00', '2020-06-30 03:00:00', '2020-07-31 03:00:00', '2020-08-31 03:00:00', '2020-09-30 03:00:00', '2020-10-31 02:00:00', '2020-11-30 02:00:00', '2020-12-31 02:00:00']

I've set spark.sql.session.timeZone to UTC

I'm using Spark 3.1.2 and python 3.9.

What else I tried

  • Passing pandas dataframe with tz aware timestamps and creating a Spark dataframe from there
  • Passing strings and converting them in Spark

Question

Probably something is going wrong converting the date range generated by the UDF back to PySpark TimeStampType, however I can't figure out what I'm doing wrong. Any ideas/input is greatly appreciated.

Obviously, if anyone has a better way of achieving this (withoud udf) that is also welcome.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

对风讲故事 2025-02-20 20:39:43

这些日期是d/m/y 还是m/d/y 格式?似乎Spark在这里没有正确解析这些日期,

我建议您另一种方法,以避免使用UDF并与Pandas合作,如果您正在使用大数据,您可能会知道导致OOM错误。您还可以尝试此代码以检查您的环境是否正在返回这些预期的输出。我使用了较小的日期范围,因此可以更明显地看到结果

#exemple df:

df = spark.createDataFrame(
    [
    ('A','01/01/2020','05/01/2020'),
    ('B','02/06/2021','04/06/2021')
    ],
    ["partition", "min_date", "max_date"]
)

df.show()

+---------+----------+----------+
|partition|  min_date|  max_date|
+---------+----------+----------+
|        A|01/01/2020|05/01/2020|
|        B|02/06/2021|04/06/2021|
+---------+----------+----------+

的步骤:

1-转换min_date和max_date到日期格式

2-计算这两个日期之间的UNIX_TIMESTAMP时间差。除以86400,因此我们可以在天数(1天= 86400s)

3-创建一个用','s的列表,每天爆炸,然后

按分区,min和max日期绘制日期4-组。 收集该组中数据的列表

import pyspark.sql.functions as F

df\
        .withColumn('min_date',F.to_date(F.col('min_date'), 'd/M/y'))\
        .withColumn('max_date',F.to_date(F.col('max_date'), 'd/M/y'))\
        .withColumn("timedelta", (F.unix_timestamp('max_date') - F.unix_timestamp('min_date'))/(86400))\
        .withColumn("repeat", F.expr("split(repeat(',', timedelta), ',')"))\
        .select("*", F.posexplode("repeat").alias("days_count", "val"))\
        .withColumn("interval_date_time_exp", (F.unix_timestamp("min_date") + F.col("days_count")*86400).cast('timestamp'))\
        .groupby('partition', 'min_date', 'max_date').agg(F.collect_list('interval_date_time_exp'))\
        .show(truncate = False)

+---------+----------+----------+---------------------------------------------------------------------------------------------------------+
|partition|min_date  |max_date  |collect_list(interval_date_time_exp)                                                                     |
+---------+----------+----------+---------------------------------------------------------------------------------------------------------+
|A        |2020-01-01|2020-01-05|[2020-01-01 00:00:00, 2020-01-02 00:00:00, 2020-01-03 00:00:00, 2020-01-04 00:00:00, 2020-01-05 00:00:00]|
|B        |2021-06-02|2021-06-04|[2021-06-02 00:00:00, 2021-06-03 00:00:00, 2021-06-04 00:00:00]                                          |
+---------+----------+----------+---------------------------------------------------------------------------------------------------------+

然后通过希望这是您要寻找的结果,

Are these dates in d/M/y or M/d/y format? Seems that spark is not parsing these dates correctly

Here I suggest you another approach, to avoid using UDFs and working with pandas, that you might know leads to oom errors if you are working with big data. You might also try this code to check if your environment is returning these expected outputs. I used a small date range so the results can be more visible

#exemple df:

df = spark.createDataFrame(
    [
    ('A','01/01/2020','05/01/2020'),
    ('B','02/06/2021','04/06/2021')
    ],
    ["partition", "min_date", "max_date"]
)

df.show()

+---------+----------+----------+
|partition|  min_date|  max_date|
+---------+----------+----------+
|        A|01/01/2020|05/01/2020|
|        B|02/06/2021|04/06/2021|
+---------+----------+----------+

Here are the steps performed:

1 - transform min_date and max_date to date format

2 - calculate the unix_timestamp time difference between these two dates. Divide by 86400 so we can have the delta in days (1 day = 86400s)

3 - create a list with ','s, one for each day, explode and then plot the dates

4 - group by partition, min and max dates. Then collect a list for the datas in this group by

import pyspark.sql.functions as F

df\
        .withColumn('min_date',F.to_date(F.col('min_date'), 'd/M/y'))\
        .withColumn('max_date',F.to_date(F.col('max_date'), 'd/M/y'))\
        .withColumn("timedelta", (F.unix_timestamp('max_date') - F.unix_timestamp('min_date'))/(86400))\
        .withColumn("repeat", F.expr("split(repeat(',', timedelta), ',')"))\
        .select("*", F.posexplode("repeat").alias("days_count", "val"))\
        .withColumn("interval_date_time_exp", (F.unix_timestamp("min_date") + F.col("days_count")*86400).cast('timestamp'))\
        .groupby('partition', 'min_date', 'max_date').agg(F.collect_list('interval_date_time_exp'))\
        .show(truncate = False)

+---------+----------+----------+---------------------------------------------------------------------------------------------------------+
|partition|min_date  |max_date  |collect_list(interval_date_time_exp)                                                                     |
+---------+----------+----------+---------------------------------------------------------------------------------------------------------+
|A        |2020-01-01|2020-01-05|[2020-01-01 00:00:00, 2020-01-02 00:00:00, 2020-01-03 00:00:00, 2020-01-04 00:00:00, 2020-01-05 00:00:00]|
|B        |2021-06-02|2021-06-04|[2021-06-02 00:00:00, 2021-06-03 00:00:00, 2021-06-04 00:00:00]                                          |
+---------+----------+----------+---------------------------------------------------------------------------------------------------------+

Hope this is the outcome you are looking for

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文