Deltalake:替换在日期格式的地方不工作
我的用例是我想在日期分区。在不同的日期,将附加行,但是如果代码在同一日期重新运行,则应
在网上查看后覆盖,似乎可以使用Deltalake的替换Whather Where功能完成此任务,但是我对任何解决方案都很好,我可以涉及Parquet
我有以下代码:
from datetime import date
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
data = [(date(2022, 6, 19), "Hello"), (date(2022, 6, 19), "World")]
schema = StructType([StructField("date", DateType()),StructField("message", StringType())])
df = spark.createDataFrame(data, schema=schema)
df.write.partitionBy("date").option("replaceWhere", f"date = '2022-06-19'").save(f"/tmp/test", mode="overwrite", format='delta')
df.write.partitionBy("date").option("replaceWhere", f"date = '2022-06-19'").save(f"/tmp/test_3", mode="overwrite", format='delta')
在第二个写调用时,代码引发以下例外:
pyspark.sql.utils.AnalysisException: Data written out does not match replaceWhere 'date = '2022-06-19''.
CHECK constraint EXPRESSION(('date = 2022-06-19)) (date = '2022-06-19') violated by row with values:
- date : 17337
my use case is that I want to partition my table on date. On different dates the rows will be appended but if the code is rerun on the same date then it should be overwritten
After looking online it seemed like this task can be done using deltalake's replacewhere feature but I am fine with any solution that involves parquet
I have the following code:
from datetime import date
from pyspark.sql import SparkSession
from pyspark.sql.types import DateType
from pyspark.sql.types import StringType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
data = [(date(2022, 6, 19), "Hello"), (date(2022, 6, 19), "World")]
schema = StructType([StructField("date", DateType()),StructField("message", StringType())])
df = spark.createDataFrame(data, schema=schema)
df.write.partitionBy("date").option("replaceWhere", f"date = '2022-06-19'").save(f"/tmp/test", mode="overwrite", format='delta')
df.write.partitionBy("date").option("replaceWhere", f"date = '2022-06-19'").save(f"/tmp/test_3", mode="overwrite", format='delta')
At the second write call the code throws the following exception:
pyspark.sql.utils.AnalysisException: Data written out does not match replaceWhere 'date = '2022-06-19''.
CHECK constraint EXPRESSION(('date = 2022-06-19)) (date = '2022-06-19') violated by row with values:
- date : 17337
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这个问题通常是因为您拥有的分区列的值与要替换为您要替换的分区相同的值,
在这里,您的问题可能是您的分区列作为日期格式,也许,如果您尝试使用字符串,则应该正常工作。
This issue generally comes because the partition column you have is not having the same value as the partition you want to replace with,
Here, your issue might be, you have your partition column as date format, maybe, if you try with string, it should work fine.