从RD到雪花的实时更新的性能改进
作为小时过程的一部分,我们从RDS中的表中获取数据,并摄入到原始数据层雪花上。
摄入后,我们必须根据摄入的新数据来更新最终集市表中的某些字段。 这张集市桌是一个非常宽的桌子,有100多个字段,桌子的大小约为10 GB。
因此,这些更新需要6-7分钟。
编辑:更新查询并解释计划:
update <target_table> tgt
set email_id =src.email_id,
first_name = (
case
when len(src.updated_first_name) < 2 then first_name
when src.updated_first_name is null or src.updated_first_name = '' then first_name
else src.updated_first_name
end ),
last_name = (
case
when len(src.updated_last_name) < 2 then last_name
when src.updated_last_name is null or src.updated_last_name =
'' then last_name
else src.updated_last_name
end),
link = src._link,
title = src.updated_title,
is_verified = 1,
last_verified = src.verified_time,
last_active_type_update = src.verified_time,
active_type = 'ENGAGED',
title_id = src.title_id,
function = src.function,
role = src.role
from <sourcetable> src
where lower(tgt.email_id) = lower(src.email_id)
and src.demographic_disposition in ('Updated Last Name','Updated
Title','Updated First Name','Verified','Updated
Country','Partial
Verify')
and src.verified_time > '<last_runtime>';
Explain Plan:
{
"GlobalStats": {
"partitionsTotal": 728,
"partitionsAssigned": 486,
"bytesAssigned": 9182406144
},
"Operations": [
[
{
"id": 0,
"operation": "Result",
"expressions": [
"number of rows updated",
"number of multi-joined rows updated"
]
},
{
"id": 1,
"parent": 0,
"operation": "Update",
"objects": [
"<target_table>"
]
},
{
"id": 2,
"parent": 1,
"operation": "InnerJoin",
"expressions": [
"joinKey: (LOWER(src.EMAIL_ID) = LOWER(tgt.EMAIL_ID))"
]
},
{
"id": 3,
"parent": 2,
"operation": "Filter",
"expressions": [
"(src.DEMOGRAPHIC_DISPOSITION IN 'Updated Last Name' IN 'Updated Job Title' IN 'Updated First Name' IN 'Verified' IN 'Updated Country' IN 'Partial Verify') AND (src.VERIFIED_TIME > '<last_runtime>') AND (LOWER(src.EMAIL_ID) IS NOT NULL)"
]
},
{
"id": 4,
"parent": 3,
"operation": "TableScan",
"objects": [
"<src_table>"
],
"expressions": [
"EMAIL_ID",
"LINK",
"UPDATED_FIRST_NAME",
"UPDATED_LAST_NAME",
"UPDATED_TITLE",
"DEMOGRAPHIC_DISPOSITION",
"VERIFIED_TIME",
"FUNCTION",
"ROLE",
"TITLE_ID"
],
"alias": "BH",
"partitionsAssigned": 1,
"partitionsTotal": 243,
"bytesAssigned": 1040384
},
{
"id": 5,
"parent": 2,
"operation": "Filter",
"expressions": [
"LOWER(tgt.EMAIL_ID) IS NOT NULL"
]
},
{
"id": 6,
"parent": 5,
"operation": "JoinFilter",
"expressions": [
"joinKey: (LOWER(src.EMAIL_ID) = LOWER(tgt.EMAIL_ID))"
]
},
{
"id": 7,
"parent": 6,
"operation": "TableScan",
"objects": [
"<target_table>"
],
"expressions": [
"EMAIL_ID",
"FIRST_NAME",
"LAST_NAME"
],
"partitionsAssigned": 485,
"partitionsTotal": 485,
"bytesAssigned": 9181365760
}
]
]
}
现在新的要求是,此更新作业每5分钟运行一次,以便我们可以在雪花中实时更新数据。
但是,我们尝试了所有查询优化,并且无法将更新的执行时间提高到不到5分钟。
我们正在雪花中使用一个小仓库,由于预算限制,我们无法将其进一步增加以固定更新查询性能。
他们是否有其他预算友好的方式来实时更新(摄入雪花之后)?
As a part of an hourly process, we source data from a table in RDS and ingest into the raw data layer Snowflake.
After ingesting, we have to update certain fields in the final mart table based on new data ingested.
This mart table is a very wide table with more than 100 fields and the size of the table is around 10 GB.
So these updates are taking 6-7 minutes.
Edit: Update Query and Explain Plan:
update <target_table> tgt
set email_id =src.email_id,
first_name = (
case
when len(src.updated_first_name) < 2 then first_name
when src.updated_first_name is null or src.updated_first_name = '' then first_name
else src.updated_first_name
end ),
last_name = (
case
when len(src.updated_last_name) < 2 then last_name
when src.updated_last_name is null or src.updated_last_name =
'' then last_name
else src.updated_last_name
end),
link = src._link,
title = src.updated_title,
is_verified = 1,
last_verified = src.verified_time,
last_active_type_update = src.verified_time,
active_type = 'ENGAGED',
title_id = src.title_id,
function = src.function,
role = src.role
from <sourcetable> src
where lower(tgt.email_id) = lower(src.email_id)
and src.demographic_disposition in ('Updated Last Name','Updated
Title','Updated First Name','Verified','Updated
Country','Partial
Verify')
and src.verified_time > '<last_runtime>';
Explain Plan:
{
"GlobalStats": {
"partitionsTotal": 728,
"partitionsAssigned": 486,
"bytesAssigned": 9182406144
},
"Operations": [
[
{
"id": 0,
"operation": "Result",
"expressions": [
"number of rows updated",
"number of multi-joined rows updated"
]
},
{
"id": 1,
"parent": 0,
"operation": "Update",
"objects": [
"<target_table>"
]
},
{
"id": 2,
"parent": 1,
"operation": "InnerJoin",
"expressions": [
"joinKey: (LOWER(src.EMAIL_ID) = LOWER(tgt.EMAIL_ID))"
]
},
{
"id": 3,
"parent": 2,
"operation": "Filter",
"expressions": [
"(src.DEMOGRAPHIC_DISPOSITION IN 'Updated Last Name' IN 'Updated Job Title' IN 'Updated First Name' IN 'Verified' IN 'Updated Country' IN 'Partial Verify') AND (src.VERIFIED_TIME > '<last_runtime>') AND (LOWER(src.EMAIL_ID) IS NOT NULL)"
]
},
{
"id": 4,
"parent": 3,
"operation": "TableScan",
"objects": [
"<src_table>"
],
"expressions": [
"EMAIL_ID",
"LINK",
"UPDATED_FIRST_NAME",
"UPDATED_LAST_NAME",
"UPDATED_TITLE",
"DEMOGRAPHIC_DISPOSITION",
"VERIFIED_TIME",
"FUNCTION",
"ROLE",
"TITLE_ID"
],
"alias": "BH",
"partitionsAssigned": 1,
"partitionsTotal": 243,
"bytesAssigned": 1040384
},
{
"id": 5,
"parent": 2,
"operation": "Filter",
"expressions": [
"LOWER(tgt.EMAIL_ID) IS NOT NULL"
]
},
{
"id": 6,
"parent": 5,
"operation": "JoinFilter",
"expressions": [
"joinKey: (LOWER(src.EMAIL_ID) = LOWER(tgt.EMAIL_ID))"
]
},
{
"id": 7,
"parent": 6,
"operation": "TableScan",
"objects": [
"<target_table>"
],
"expressions": [
"EMAIL_ID",
"FIRST_NAME",
"LAST_NAME"
],
"partitionsAssigned": 485,
"partitionsTotal": 485,
"bytesAssigned": 9181365760
}
]
]
}
Now the new requirement is that this update jobs run every 5 minutes so that we can have near real time updated data in Snowflake.
However, we have tried all query optimisation and are unable to bring the execution time of the updates to less than 5 minutes.
We are using a Small warehouse in Snowflake and due to budget constraints we cant increase it further to fasten the update query performance.
Is their any other budget friendly way to do near real time updates(after ingesting the data) in Snowflake?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论