如何在Databricks中提供UPSERT的条件-Pyspark

发布于 2025-01-21 00:38:29 字数 1187 浏览 3 评论 0原文

我有一个表 demo_table_one 我想在其中更新插入以下值

data = [
    (11111 , 'CA', '2020-01-26'),
    (11111 , 'CA', '2020-02-26'),
    (88888 , 'CA', '2020-06-10'),
    (88888 , 'CA', '2020-05-10'),
    (88888 , 'WA', '2020-07-10'),
    (88888 , 'WA', '2020-07-15'),
    (55555 , 'WA', '2020-05-15'),
    (55555 , 'CA', '2020-03-15'),
    ]

columns = ['attom_id', 'state_code', 'sell_date']
df = spark.createDataFrame(data, columns)

逻辑是对于每个 attom_id & state_code 我们只想要最新的 sell_date 所以我的表中的数据应该是这样的

[
    (11111 , 'CA', '2020-02-26'),
    (88888 , 'CA', '2020-06-10'),
    (88888 , 'WA', '2020-07-15'),
    (55555 , 'CA', '2020-03-15')
]

,我有以下代码来执行此操作

from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, "demo_table_one") 

#perform the UPSERT
(deltaTable.alias('orginal_table')
    .merge(df.alias('update_table'), 
   "orginal_table.state_code = update_table.state_code and orginal_table.attom_id = update_table.attom_id")
    .whenNotMatchedInsertAll()
    .whenMatchedUpdateAll("orginal_table.sell_date < update_table.sell_date")
    .execute())

但这会插入表中的所有值

I have a table demo_table_one in which I want to upsert the following values

data = [
    (11111 , 'CA', '2020-01-26'),
    (11111 , 'CA', '2020-02-26'),
    (88888 , 'CA', '2020-06-10'),
    (88888 , 'CA', '2020-05-10'),
    (88888 , 'WA', '2020-07-10'),
    (88888 , 'WA', '2020-07-15'),
    (55555 , 'WA', '2020-05-15'),
    (55555 , 'CA', '2020-03-15'),
    ]

columns = ['attom_id', 'state_code', 'sell_date']
df = spark.createDataFrame(data, columns)

The logic is that for each attom_id & state_code we only want the latest sell_date
So the data in my table should be like

[
    (11111 , 'CA', '2020-02-26'),
    (88888 , 'CA', '2020-06-10'),
    (88888 , 'WA', '2020-07-15'),
    (55555 , 'CA', '2020-03-15')
]

and I have the following code to do it

from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, "demo_table_one") 

#perform the UPSERT
(deltaTable.alias('orginal_table')
    .merge(df.alias('update_table'), 
   "orginal_table.state_code = update_table.state_code and orginal_table.attom_id = update_table.attom_id")
    .whenNotMatchedInsertAll()
    .whenMatchedUpdateAll("orginal_table.sell_date < update_table.sell_date")
    .execute())

But this inserts all the values in the table

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

标点 2025-01-28 00:38:29

您需要先在输入上进行SQL查询,以获取具有最大值的记录,首先适当。

您的.whennotMatchedInsertall()揭示未找到所有记录并因此插入。

老实说,我发现Databricks上的文档不太好,但这是我要做的(您也可以做SQL):

data = [
    (11111 , 'CA', '2020-01-26'),
    (11111 , 'CA', '2020-02-26'),
    (88888 , 'CA', '2020-06-10'),
    (88888 , 'CA', '2020-05-10'),
    (88888 , 'WA', '2020-07-10'),
    (88888 , 'WA', '2020-07-15'),
    (55555 , 'WA', '2020-05-15'),
    (55555 , 'CA', '2020-03-15'),
    ]

columns = ['attom_id', 'state_code', 'sell_date']
df = spark.createDataFrame(data, columns)
df.createOrReplaceTempView("newer_data")
 
spark.sql(""" MERGE INTO DEMO_TABLE_ONE
              USING ( SELECT attom_id, state_code, max(sell_date) as sell_date from newer_data group by attom_id, state_code) AS NEWER
              ON DEMO_TABLE_ONE.attom_id = NEWER.attom_id AND DEMO_TABLE_ONE.state_code = NEWER.state_code
                WHEN MATCHED THEN
                  UPDATE SET *
                WHEN NOT MATCHED THEN
                  INSERT *
          """)

You need to do an SQL query first on the input to get the records with max value, appropriately, first.

Your .whenNotMatchedInsertAll() reveals that all records are not found and thus inserted.

I find the docs not so great on Databricks to be honest, but this is what I would do (you can do the SQL before as well):

data = [
    (11111 , 'CA', '2020-01-26'),
    (11111 , 'CA', '2020-02-26'),
    (88888 , 'CA', '2020-06-10'),
    (88888 , 'CA', '2020-05-10'),
    (88888 , 'WA', '2020-07-10'),
    (88888 , 'WA', '2020-07-15'),
    (55555 , 'WA', '2020-05-15'),
    (55555 , 'CA', '2020-03-15'),
    ]

columns = ['attom_id', 'state_code', 'sell_date']
df = spark.createDataFrame(data, columns)
df.createOrReplaceTempView("newer_data")
 
spark.sql(""" MERGE INTO DEMO_TABLE_ONE
              USING ( SELECT attom_id, state_code, max(sell_date) as sell_date from newer_data group by attom_id, state_code) AS NEWER
              ON DEMO_TABLE_ONE.attom_id = NEWER.attom_id AND DEMO_TABLE_ONE.state_code = NEWER.state_code
                WHEN MATCHED THEN
                  UPDATE SET *
                WHEN NOT MATCHED THEN
                  INSERT *
          """)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文