Deltalake:替换在日期格式的地方不工作

发布于 2025-02-08 17:50:08 字数 1160 浏览 2 评论 0原文

我的用例是我想在日期分区。在不同的日期,将附加行,但是如果代码在同一日期重新运行,则应

在网上查看后覆盖,似乎可以使用Deltalake的替换Whather Where功能完成此任务,但是我对任何解决方案都很好,我可以涉及Parquet

我有以下代码:

from datetime import date
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType

data = [(date(2022, 6, 19),  "Hello"), (date(2022, 6, 19), "World")]

schema = StructType([StructField("date", DateType()),StructField("message", StringType())])

df = spark.createDataFrame(data, schema=schema)
df.write.partitionBy("date").option("replaceWhere", f"date = '2022-06-19'").save(f"/tmp/test", mode="overwrite", format='delta')
df.write.partitionBy("date").option("replaceWhere", f"date = '2022-06-19'").save(f"/tmp/test_3", mode="overwrite", format='delta')

在第二个写调用时,代码引发以下例外:

pyspark.sql.utils.AnalysisException: Data written out does not match replaceWhere 'date = '2022-06-19''.
CHECK constraint EXPRESSION(('date = 2022-06-19)) (date = '2022-06-19') violated by row with values:
 - date : 17337

my use case is that I want to partition my table on date. On different dates the rows will be appended but if the code is rerun on the same date then it should be overwritten

After looking online it seemed like this task can be done using deltalake's replacewhere feature but I am fine with any solution that involves parquet

I have the following code:

from datetime import date
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType

data = [(date(2022, 6, 19),  "Hello"), (date(2022, 6, 19), "World")]

schema = StructType([StructField("date", DateType()),StructField("message", StringType())])

df = spark.createDataFrame(data, schema=schema)
df.write.partitionBy("date").option("replaceWhere", f"date = '2022-06-19'").save(f"/tmp/test", mode="overwrite", format='delta')
df.write.partitionBy("date").option("replaceWhere", f"date = '2022-06-19'").save(f"/tmp/test_3", mode="overwrite", format='delta')

At the second write call the code throws the following exception:

pyspark.sql.utils.AnalysisException: Data written out does not match replaceWhere 'date = '2022-06-19''.
CHECK constraint EXPRESSION(('date = 2022-06-19)) (date = '2022-06-19') violated by row with values:
 - date : 17337

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

向日葵 2025-02-15 17:50:08

这个问题通常是因为您拥有的分区列的值与要替换为您要替换的分区相同的值,

在这里,您的问题可能是您的分区列作为日期格式,也许,如果您尝试使用字符串,则应该正常工作。

This issue generally comes because the partition column you have is not having the same value as the partition you want to replace with,

Here, your issue might be, you have your partition column as date format, maybe, if you try with string, it should work fine.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文