从RD到雪花的实时更新的性能改进

发布于 2025-02-03 12:02:28 字数 3330 浏览 3 评论 0原文

作为小时过程的一部分,我们从RDS中的表中获取数据,并摄入到原始数据层雪花上。

摄入后,我们必须根据摄入的新数据来更新最终集市表中的某些字段。 这张集市桌是一个非常宽的桌子,有100多个字段,桌子的大小约为10 GB。

因此,这些更新需要6-7分钟。

编辑:更新查询并解释计划:

update <target_table> tgt
set  email_id =src.email_id,
first_name = (
case
 when len(src.updated_first_name) < 2 then first_name
 when src.updated_first_name is null or src.updated_first_name = '' then first_name
 else src.updated_first_name 
end ),
last_name = (
case
 when len(src.updated_last_name) < 2 then last_name
 when src.updated_last_name is null or src.updated_last_name = 
'' then last_name
 else src.updated_last_name 
end),
link = src._link,
title = src.updated_title,
is_verified = 1,
last_verified = src.verified_time,
last_active_type_update = src.verified_time,
active_type = 'ENGAGED',                            
title_id = src.title_id,
function = src.function,
role = src.role
from <sourcetable> src
where lower(tgt.email_id) = lower(src.email_id)
and src.demographic_disposition in ('Updated Last Name','Updated 
Title','Updated First Name','Verified','Updated 
Country','Partial 
Verify')
and src.verified_time > '<last_runtime>';

Explain Plan:
     {
  "GlobalStats": {
"partitionsTotal": 728,
"partitionsAssigned": 486,
"bytesAssigned": 9182406144
  },
  "Operations": [
[
  {
    "id": 0,
    "operation": "Result",
    "expressions": [
      "number of rows updated",
      "number of multi-joined rows updated"
    ]
  },
  {
    "id": 1,
    "parent": 0,
    "operation": "Update",
    "objects": [
      "<target_table>"
    ]
  },
  {
    "id": 2,
    "parent": 1,
    "operation": "InnerJoin",
    "expressions": [
      "joinKey: (LOWER(src.EMAIL_ID) = LOWER(tgt.EMAIL_ID))"
    ]
  },
  {
    "id": 3,
    "parent": 2,
    "operation": "Filter",
    "expressions": [
      "(src.DEMOGRAPHIC_DISPOSITION IN 'Updated Last Name' IN 'Updated Job Title' IN 'Updated First Name' IN 'Verified' IN 'Updated Country' IN 'Partial Verify') AND (src.VERIFIED_TIME > '<last_runtime>') AND (LOWER(src.EMAIL_ID) IS NOT NULL)"
    ]
  },
  {
    "id": 4,
    "parent": 3,
    "operation": "TableScan",
    "objects": [
      "<src_table>"
    ],
    "expressions": [
      "EMAIL_ID",
      "LINK",
      "UPDATED_FIRST_NAME",
      "UPDATED_LAST_NAME",
      "UPDATED_TITLE",
      "DEMOGRAPHIC_DISPOSITION",
      "VERIFIED_TIME",
      "FUNCTION",
      "ROLE",
      "TITLE_ID"
    ],
    "alias": "BH",
    "partitionsAssigned": 1,
    "partitionsTotal": 243,
    "bytesAssigned": 1040384
  },
  {
    "id": 5,
    "parent": 2,
    "operation": "Filter",
    "expressions": [
      "LOWER(tgt.EMAIL_ID) IS NOT NULL"
    ]
  },
  {
    "id": 6,
    "parent": 5,
    "operation": "JoinFilter",
    "expressions": [
      "joinKey: (LOWER(src.EMAIL_ID) = LOWER(tgt.EMAIL_ID))"
    ]
  },
  {
    "id": 7,
    "parent": 6,
    "operation": "TableScan",
    "objects": [
      "<target_table>"
    ],
    "expressions": [
      "EMAIL_ID",
      "FIRST_NAME",
      "LAST_NAME"
    ],
    "partitionsAssigned": 485,
    "partitionsTotal": 485,
    "bytesAssigned": 9181365760
  }
]
]
}

现在新的要求是,此更新作业每5分钟运行一次,以便我们可以在雪花中实时更新数据。

但是,我们尝试了所有查询优化,并且无法将更新的执行时间提高到不到5分钟。

我们正在雪花中使用一个小仓库,由于预算限制,我们无法将其进一步增加以固定更新查询性能。

他们是否有其他预算友好的方式来实时更新(摄入雪花之后)?

As a part of an hourly process, we source data from a table in RDS and ingest into the raw data layer Snowflake.

After ingesting, we have to update certain fields in the final mart table based on new data ingested.
This mart table is a very wide table with more than 100 fields and the size of the table is around 10 GB.

So these updates are taking 6-7 minutes.

Edit: Update Query and Explain Plan:

update <target_table> tgt
set  email_id =src.email_id,
first_name = (
case
 when len(src.updated_first_name) < 2 then first_name
 when src.updated_first_name is null or src.updated_first_name = '' then first_name
 else src.updated_first_name 
end ),
last_name = (
case
 when len(src.updated_last_name) < 2 then last_name
 when src.updated_last_name is null or src.updated_last_name = 
'' then last_name
 else src.updated_last_name 
end),
link = src._link,
title = src.updated_title,
is_verified = 1,
last_verified = src.verified_time,
last_active_type_update = src.verified_time,
active_type = 'ENGAGED',                            
title_id = src.title_id,
function = src.function,
role = src.role
from <sourcetable> src
where lower(tgt.email_id) = lower(src.email_id)
and src.demographic_disposition in ('Updated Last Name','Updated 
Title','Updated First Name','Verified','Updated 
Country','Partial 
Verify')
and src.verified_time > '<last_runtime>';

Explain Plan:
     {
  "GlobalStats": {
"partitionsTotal": 728,
"partitionsAssigned": 486,
"bytesAssigned": 9182406144
  },
  "Operations": [
[
  {
    "id": 0,
    "operation": "Result",
    "expressions": [
      "number of rows updated",
      "number of multi-joined rows updated"
    ]
  },
  {
    "id": 1,
    "parent": 0,
    "operation": "Update",
    "objects": [
      "<target_table>"
    ]
  },
  {
    "id": 2,
    "parent": 1,
    "operation": "InnerJoin",
    "expressions": [
      "joinKey: (LOWER(src.EMAIL_ID) = LOWER(tgt.EMAIL_ID))"
    ]
  },
  {
    "id": 3,
    "parent": 2,
    "operation": "Filter",
    "expressions": [
      "(src.DEMOGRAPHIC_DISPOSITION IN 'Updated Last Name' IN 'Updated Job Title' IN 'Updated First Name' IN 'Verified' IN 'Updated Country' IN 'Partial Verify') AND (src.VERIFIED_TIME > '<last_runtime>') AND (LOWER(src.EMAIL_ID) IS NOT NULL)"
    ]
  },
  {
    "id": 4,
    "parent": 3,
    "operation": "TableScan",
    "objects": [
      "<src_table>"
    ],
    "expressions": [
      "EMAIL_ID",
      "LINK",
      "UPDATED_FIRST_NAME",
      "UPDATED_LAST_NAME",
      "UPDATED_TITLE",
      "DEMOGRAPHIC_DISPOSITION",
      "VERIFIED_TIME",
      "FUNCTION",
      "ROLE",
      "TITLE_ID"
    ],
    "alias": "BH",
    "partitionsAssigned": 1,
    "partitionsTotal": 243,
    "bytesAssigned": 1040384
  },
  {
    "id": 5,
    "parent": 2,
    "operation": "Filter",
    "expressions": [
      "LOWER(tgt.EMAIL_ID) IS NOT NULL"
    ]
  },
  {
    "id": 6,
    "parent": 5,
    "operation": "JoinFilter",
    "expressions": [
      "joinKey: (LOWER(src.EMAIL_ID) = LOWER(tgt.EMAIL_ID))"
    ]
  },
  {
    "id": 7,
    "parent": 6,
    "operation": "TableScan",
    "objects": [
      "<target_table>"
    ],
    "expressions": [
      "EMAIL_ID",
      "FIRST_NAME",
      "LAST_NAME"
    ],
    "partitionsAssigned": 485,
    "partitionsTotal": 485,
    "bytesAssigned": 9181365760
  }
]
]
}

Now the new requirement is that this update jobs run every 5 minutes so that we can have near real time updated data in Snowflake.

However, we have tried all query optimisation and are unable to bring the execution time of the updates to less than 5 minutes.

We are using a Small warehouse in Snowflake and due to budget constraints we cant increase it further to fasten the update query performance.

Is their any other budget friendly way to do near real time updates(after ingesting the data) in Snowflake?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文