在Pyspark中阅读划分的镶木记录

发布于 2025-02-11 19:27:06 字数 115 浏览 1 评论 0原文

我有一个由日期字段(yyyy-mm-dd)分区的镶木quet文件。 如何在Pyspark中有效地读取文件中的(当前日期1天)记录 - 请建议。

PS:我不想读取整个文件,然后因为数据量很大而过滤记录。

I have a parquet file partitioned by a date field (YYYY-MM-DD).
How to read the (current date-1 day) records from the file efficiently in Pyspark - please suggest.

PS: I would not like to read the entire file and then filter the records as the data volume is huge.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

风启觞 2025-02-18 19:27:06

有多种方法可以解决:
假设这是输入数据,您可以在“日期”列上写出分区的数据框:

data = [(datetime.date(2022, 6, 12),  "Hello"), (datetime.date(2022, 6, 19), "World")]
schema = StructType([StructField("date", DateType()),StructField("message", StringType())])
df = spark.createDataFrame(data, schema=schema)
df.write.mode('overwrite').partitionBy('date').parquet('./test')

您可以读取与此语法相关的给定日期关联的木板文件:

spark.read.parquet('./test/date=2022-06-19').show()

# The catch is that the date column is gonna be omitted from your dataframe
+-------+                                                                       
|message|
+-------+
|  World|
+-------+

# You could try adding the date column with lit syntax.
(spark.read.parquet('./test/date=2022-06-19')
 .withColumn('date', f.lit('2022-06-19').cast(DateType()))
 .show()
)
# Output
+-------+----------+
|message|      date|
+-------+----------+
|  World|2022-06-19|
+-------+----------+

更有效的解决方案是使用Delta表:

df.write.mode('overwrite').partitionBy('date').format('delta').save('/test')
spark.read.format('delta').load('./test').where(f.col('date') == '2022-06-19').show()

Spark Engine使用_DELTA_LOG为了优化查询,仅读取适用于查询的镶木quet文件。另外,输出将具有所有列:

+-------+----------+
|message|      date|
+-------+----------+
|  World|2022-06-19|
+-------+----------+

There are multiple ways to go about this:
Suppose this is the input data and you write out the dataframe partitioned on "date" column:

data = [(datetime.date(2022, 6, 12),  "Hello"), (datetime.date(2022, 6, 19), "World")]
schema = StructType([StructField("date", DateType()),StructField("message", StringType())])
df = spark.createDataFrame(data, schema=schema)
df.write.mode('overwrite').partitionBy('date').parquet('./test')

You can read the parquet files associated to a given date with this syntax:

spark.read.parquet('./test/date=2022-06-19').show()

# The catch is that the date column is gonna be omitted from your dataframe
+-------+                                                                       
|message|
+-------+
|  World|
+-------+

# You could try adding the date column with lit syntax.
(spark.read.parquet('./test/date=2022-06-19')
 .withColumn('date', f.lit('2022-06-19').cast(DateType()))
 .show()
)
# Output
+-------+----------+
|message|      date|
+-------+----------+
|  World|2022-06-19|
+-------+----------+

The more efficient solution is using the delta tables:

df.write.mode('overwrite').partitionBy('date').format('delta').save('/test')
spark.read.format('delta').load('./test').where(f.col('date') == '2022-06-19').show()

The spark engine uses the _delta_log to optimize your query and only reads the parquet files that are applicable to your query. Also, the output will have all the columns:

+-------+----------+
|message|      date|
+-------+----------+
|  World|2022-06-19|
+-------+----------+
小嗷兮 2025-02-18 19:27:06

您可以在阅读时通过日期变量读取它。

这是动态代码,您也不需要硬码日期,只需将其附加到路径上

>>> df.show()
+-----+-----------------+-----------+----------+
|Sr_No|          User_Id|Transaction|        dt|
+-----+-----------------+-----------+----------+
|    1|paytm 111002203@p|       100D|2022-06-29|
|    2|paytm 111002203@p|        50C|2022-06-27|
|    3|paytm 111002203@p|        20C|2022-06-26|
|    4|paytm 111002203@p|        10C|2022-06-25|
|    5|             null|         1C|2022-06-24|
+-----+-----------------+-----------+----------+

>>> df.write.partitionBy("dt").mode("append").parquet("/dir1/dir2/sample.parquet")

>>> from datetime import date
>>> from datetime import timedelta
>>> today = date.today()
#Here i am taking two days back date, for one day back you can do (days=1)
>>> yesterday = today - timedelta(days = 2)
>>> two_days_back=yesterday.strftime('%Y-%m-%d')

>>> path="/di1/dir2/sample.parquet/dt="+two_days_back
>>> spark.read.parquet(path).show()
+-----+-----------------+-----------+
|Sr_No|          User_Id|Transaction|
+-----+-----------------+-----------+
|    2|paytm 111002203@p|        50C|
+-----+-----------------+-----------+

you can read it by passing date variable while reading.

This is dynamic code, you nor need to hardcode date, just append it with path

>>> df.show()
+-----+-----------------+-----------+----------+
|Sr_No|          User_Id|Transaction|        dt|
+-----+-----------------+-----------+----------+
|    1|paytm 111002203@p|       100D|2022-06-29|
|    2|paytm 111002203@p|        50C|2022-06-27|
|    3|paytm 111002203@p|        20C|2022-06-26|
|    4|paytm 111002203@p|        10C|2022-06-25|
|    5|             null|         1C|2022-06-24|
+-----+-----------------+-----------+----------+

>>> df.write.partitionBy("dt").mode("append").parquet("/dir1/dir2/sample.parquet")

>>> from datetime import date
>>> from datetime import timedelta
>>> today = date.today()
#Here i am taking two days back date, for one day back you can do (days=1)
>>> yesterday = today - timedelta(days = 2)
>>> two_days_back=yesterday.strftime('%Y-%m-%d')

>>> path="/di1/dir2/sample.parquet/dt="+two_days_back
>>> spark.read.parquet(path).show()
+-----+-----------------+-----------+
|Sr_No|          User_Id|Transaction|
+-----+-----------------+-----------+
|    2|paytm 111002203@p|        50C|
+-----+-----------------+-----------+
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文