Pyspark:UDF中的时区转换
假设我们有一个pyspark dataframe
,看起来像下面的
分区 | min_date | max_date |
---|---|---|
A | 01/01/2020 | 01/01/2021 |
B | 01/06/2021 | 01/06/2022 |
我想添加一列date_list
,其中包含min_date
和max_date
(包含)给定(pandas)频率。
经过一番调查,我选择了UDF是实现这一目标的最佳方法。
def create_date_range_column_factory(frequency: str):
"""Returns a udf that generates a list of dates between min and max with a given frequency"""
def create_date_range_column(min_date: datetime.datetime, max_date: datetime.datetime) -> List[datetime.datetime]:
# Using to to_pydatetime() as returning pandas timestamp gives an error.
return [pdt.to_pydatetime() for pdt in pd.date_range(min_date, max_date, freq=frequency)]
return sf.udf(create_date_range_column, st.ArrayType(st.TimestampType()))
# Create function that generates a list with month start frequency
date_range_udf = create_date_range_column_factory("M")
sdf = sdf.withColumn("date_list", date_range_udf(sf.col("min_date"), sf.col("max_date")))
df = sdf.toPandas()
# Just to get an easy to read output
output_list = [str(x) for x in df['date_list'][0]]
output_list
看起来像:
['2020-01-31 01:00:00', '2020-02-29 01:00:00', '2020-03-31 01:00:00', '2020-04-30 01:00:00', '2020-05-31 01:00:00', '2020-06-30 01:00:00', '2020-07-31 01:00:00', '2020-08-31 01:00:00', '2020-09-30 01:00:00', '2020-10-31 01:00:00', '2020-11-30 01:00:00', '2020-12-31 01:00:00']
显然,这不是我所期望的。我已经尝试设置tz = pytz.utc
in pd.date_range
,但是输出使我感到困惑:
['2020-01-31 02:00:00', '2020-02-29 02:00:00', '2020-03-31 03:00:00', '2020-04-30 03:00:00', '2020-05-31 03:00:00', '2020-06-30 03:00:00', '2020-07-31 03:00:00', '2020-08-31 03:00:00', '2020-09-30 03:00:00', '2020-10-31 02:00:00', '2020-11-30 02:00:00', '2020-12-31 02:00:00']
我已经设置spark.sql.session.timezone
UTC
我正在使用Spark 3.1.2和Python 3.9。
我还尝试了什么
- 使用TZ Aware时间戳传递Pandas DataFrame,并从那里
- 传递字符串并在Spark
问题
中创建Spark数据框通过UDF回到Pyspark Timestamptepe,但是我无法弄清楚我在做什么。任何想法/投入都将不胜感激。
显然,如果有人有更好的方法来实现这一目标(使用UDF),这也受到欢迎。
Suppose we have a PySpark dataframe
that looks like below
partition | min_date | max_date |
---|---|---|
A | 01/01/2020 | 01/01/2021 |
B | 01/06/2021 | 01/06/2022 |
I want to add a column, say date_list
, that contains a list of dates between the min_date
and max_date
(inclusive) for a given (pandas) frequency.
After some investigation I opted that a UDF would be the best way to achieve this.
def create_date_range_column_factory(frequency: str):
"""Returns a udf that generates a list of dates between min and max with a given frequency"""
def create_date_range_column(min_date: datetime.datetime, max_date: datetime.datetime) -> List[datetime.datetime]:
# Using to to_pydatetime() as returning pandas timestamp gives an error.
return [pdt.to_pydatetime() for pdt in pd.date_range(min_date, max_date, freq=frequency)]
return sf.udf(create_date_range_column, st.ArrayType(st.TimestampType()))
# Create function that generates a list with month start frequency
date_range_udf = create_date_range_column_factory("M")
sdf = sdf.withColumn("date_list", date_range_udf(sf.col("min_date"), sf.col("max_date")))
df = sdf.toPandas()
# Just to get an easy to read output
output_list = [str(x) for x in df['date_list'][0]]
output_list
then looks like:
['2020-01-31 01:00:00', '2020-02-29 01:00:00', '2020-03-31 01:00:00', '2020-04-30 01:00:00', '2020-05-31 01:00:00', '2020-06-30 01:00:00', '2020-07-31 01:00:00', '2020-08-31 01:00:00', '2020-09-30 01:00:00', '2020-10-31 01:00:00', '2020-11-30 01:00:00', '2020-12-31 01:00:00']
Obviously, this is not what I expected. I've tried setting tz=pytz.utc
in pd.date_range
but that output puzzles me:
['2020-01-31 02:00:00', '2020-02-29 02:00:00', '2020-03-31 03:00:00', '2020-04-30 03:00:00', '2020-05-31 03:00:00', '2020-06-30 03:00:00', '2020-07-31 03:00:00', '2020-08-31 03:00:00', '2020-09-30 03:00:00', '2020-10-31 02:00:00', '2020-11-30 02:00:00', '2020-12-31 02:00:00']
I've set spark.sql.session.timeZone
to UTC
I'm using Spark 3.1.2 and python 3.9.
What else I tried
- Passing pandas dataframe with tz aware timestamps and creating a Spark dataframe from there
- Passing strings and converting them in Spark
Question
Probably something is going wrong converting the date range generated by the UDF back to PySpark TimeStampType, however I can't figure out what I'm doing wrong. Any ideas/input is greatly appreciated.
Obviously, if anyone has a better way of achieving this (withoud udf) that is also welcome.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这些日期是
d/m/y 还是
m/d/y 格式?似乎Spark在这里没有正确解析这些日期,
我建议您另一种方法,以避免使用UDF并与Pandas合作,如果您正在使用大数据,您可能会知道导致OOM错误。您还可以尝试此代码以检查您的环境是否正在返回这些预期的输出。我使用了较小的日期范围,因此可以更明显地看到结果
的步骤:
1-转换min_date和max_date到日期格式
2-计算这两个日期之间的UNIX_TIMESTAMP时间差。除以86400,因此我们可以在天数(1天= 86400s)
3-创建一个用','s的列表,每天爆炸,然后
按分区,min和max日期绘制日期4-组。 收集该组中数据的列表
然后通过希望这是您要寻找的结果,
Are these dates in
d/M/y
orM/d/y
format? Seems that spark is not parsing these dates correctlyHere I suggest you another approach, to avoid using UDFs and working with pandas, that you might know leads to oom errors if you are working with big data. You might also try this code to check if your environment is returning these expected outputs. I used a small date range so the results can be more visible
Here are the steps performed:
1 - transform min_date and max_date to date format
2 - calculate the unix_timestamp time difference between these two dates. Divide by 86400 so we can have the delta in days (1 day = 86400s)
3 - create a list with ','s, one for each day, explode and then plot the dates
4 - group by partition, min and max dates. Then collect a list for the datas in this group by
Hope this is the outcome you are looking for