如何在Databricks中提供UPSERT的条件-Pyspark
我有一个表 demo_table_one
我想在其中更新插入以下值
data = [
(11111 , 'CA', '2020-01-26'),
(11111 , 'CA', '2020-02-26'),
(88888 , 'CA', '2020-06-10'),
(88888 , 'CA', '2020-05-10'),
(88888 , 'WA', '2020-07-10'),
(88888 , 'WA', '2020-07-15'),
(55555 , 'WA', '2020-05-15'),
(55555 , 'CA', '2020-03-15'),
]
columns = ['attom_id', 'state_code', 'sell_date']
df = spark.createDataFrame(data, columns)
逻辑是对于每个 attom_id & state_code 我们只想要最新的 sell_date 所以我的表中的数据应该是这样的
[
(11111 , 'CA', '2020-02-26'),
(88888 , 'CA', '2020-06-10'),
(88888 , 'WA', '2020-07-15'),
(55555 , 'CA', '2020-03-15')
]
,我有以下代码来执行此操作
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, "demo_table_one")
#perform the UPSERT
(deltaTable.alias('orginal_table')
.merge(df.alias('update_table'),
"orginal_table.state_code = update_table.state_code and orginal_table.attom_id = update_table.attom_id")
.whenNotMatchedInsertAll()
.whenMatchedUpdateAll("orginal_table.sell_date < update_table.sell_date")
.execute())
但这会插入表中的所有值
I have a table demo_table_one
in which I want to upsert the following values
data = [
(11111 , 'CA', '2020-01-26'),
(11111 , 'CA', '2020-02-26'),
(88888 , 'CA', '2020-06-10'),
(88888 , 'CA', '2020-05-10'),
(88888 , 'WA', '2020-07-10'),
(88888 , 'WA', '2020-07-15'),
(55555 , 'WA', '2020-05-15'),
(55555 , 'CA', '2020-03-15'),
]
columns = ['attom_id', 'state_code', 'sell_date']
df = spark.createDataFrame(data, columns)
The logic is that for each attom_id & state_code we only want the latest sell_date
So the data in my table should be like
[
(11111 , 'CA', '2020-02-26'),
(88888 , 'CA', '2020-06-10'),
(88888 , 'WA', '2020-07-15'),
(55555 , 'CA', '2020-03-15')
]
and I have the following code to do it
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, "demo_table_one")
#perform the UPSERT
(deltaTable.alias('orginal_table')
.merge(df.alias('update_table'),
"orginal_table.state_code = update_table.state_code and orginal_table.attom_id = update_table.attom_id")
.whenNotMatchedInsertAll()
.whenMatchedUpdateAll("orginal_table.sell_date < update_table.sell_date")
.execute())
But this inserts all the values in the table
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您需要先在输入上进行SQL查询,以获取具有最大值的记录,首先适当。
您的
.whennotMatchedInsertall()
揭示未找到所有记录并因此插入。老实说,我发现Databricks上的文档不太好,但这是我要做的(您也可以做SQL):
You need to do an SQL query first on the input to get the records with max value, appropriately, first.
Your
.whenNotMatchedInsertAll()
reveals that all records are not found and thus inserted.I find the docs not so great on Databricks to be honest, but this is what I would do (you can do the SQL before as well):