使用动态表目标功能将 XML 数据流传输至 Bigquery(面临延迟问题)
我是数据流新手,并遇到了以下问题。
问题陈述:需要数据流作业 (Python) 将 XML 从 GCS 加载到 Bigquery(批量加载)。 Bigquery 中的目标表是动态的,并在运行时根据 XML 文件名进行计算。
解决方案已确定:只需按照文章进行操作 - https://medium.com/google-cloud/how-to-load-xml-data-into-bigquery-using-python-dataflow-fd1580e4af48。其中BIGQUERYWRITE转换中使用了静态表,但我使用通过可调用函数获得的动态表名称。(附上代码参考)
代码:
import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage
# List the gcs file objects
# ToDo: Use Apache Beam GCSIO Match Patterns To list the file objects
storage_client = storage.Client()
bucket_name = "xmltobq"
bucket=storage_client.get_bucket(bucket_name)
blobs = list(bucket.list_blobs(prefix="xmlfiles/"))
blob_files = [blob.name for blob in blobs if ".xml" in blob.name]
#Static schema
table_schema = {
"fields": [
{'name' : 'filename', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'CustomerID', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'EmployeeID', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'OrderDate', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'RequiredDate', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipInfo', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [
{'name' : 'ShipVia', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'Freight', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipName', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipAddress', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipCity', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipRegion', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipPostalCode', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipCountry', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShippedDate', 'type': 'STRING', 'mode': 'NULLABLE'},
]},
]
}
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as p:
def readfiles(element):
'''
Input Pcollection: GCS Element Path
Output Pcollection: (XML, Filename)
'''
# Simple XML conversion using XMLTODICT package
# ToDo: Once specific XML paths are acquired, we can parse only the required fields
import xmltodict
import apache_beam as beam
gcs_file = beam.io.filesystems.FileSystems.open("gs://xmltobq/"+element)
parsed_xml = xmltodict.parse(gcs_file)
return parsed_xml, element.split("/")[1].split(".")[0]
def xmlformatting(element):
'''
Input Pcollection: XML
Output Pcollection: A generator of Modified XML Elements
'''
data, filename = element
for order in data['Root']['Orders']['Order']:
yield formatting(order, filename)
#def tablename(e):
# import re
# return "gcp-bq-2021:dataset4." + re.sub("[\s+,(,)]", "", a)
def formatting(order, filename):
'''
Input Pcollection: (XMLELEMENT, Filename)
Output PCollection: Modified XML
ToDo: This is just to handle the sample xml, production code will be havin different
formatting procress'''
import copy
import re
order_copy = copy.deepcopy(order)
if "@ShippedDate" in order['ShipInfo']:
order_copy['ShipInfo']['ShippedDate'] = order['ShipInfo']['@ShippedDate']
del order_copy['ShipInfo']['@ShippedDate']
order_copy['filename'] = "gcp-bq-2021:testdataset."+re.sub("[\s+,(,)]", "", filename)
return order_copy
# Dynamic table name same as that of input xml file by adding file name as key in the dictionary and accessing
# them in writetobigquery
# ToDo: In Production code dynamic schema option will be included in the Writetobq transform
pipeline_data = (p | "Create GCS Object List" >> beam.Create(blob_files) |
"XMLConversion" >> beam.Map(readfiles) |
"XMLformatting" >> beam.FlatMap(xmlformatting) | "shuffle" >> beam.Reshuffle() |
beam.io.WriteToBigQuery(table=lambda row: row['filename'],
# A lambda function to return dynamic table name,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, #WRITE_TRUNCATE
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
custom_gcs_temp_location="gs://xmltobq"))
问题: 我能够在数据流运行程序中成功运行作业并将文件移动到 Bigquery。但是 WRITETOBIGQUERY 消耗的时间太长,尤其是在 ParDo(TriggerCopyJobs) 步骤中 吞吐量几乎低于 1 个元素/秒
如果它是单个表,而不是动态表,则工作可以快速完成。
我做的任何错误是否会阻止并行处理。
使用的机器类型:n1-highcpu-8。
职位ID:2022-03-09_07_28_47-10567887862012507747
I am new to Dataflow and got stuck with the below issue.
Problem statement: Need a Dataflow job(Python) to load XML from GCS into Bigquery (Batch Load). The Destination table in Bigquery is dynamic and calculated at the run time based on the XML file name.
Solution Decided: Just followed the article - https://medium.com/google-cloud/how-to-load-xml-data-into-bigquery-using-python-dataflow-fd1580e4af48. Wherein there static table was used in BIGQUERYWRITE transform, but I am using dynamic table name obtained via a callable function.(Attaching the code reference)
Code:
import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage
# List the gcs file objects
# ToDo: Use Apache Beam GCSIO Match Patterns To list the file objects
storage_client = storage.Client()
bucket_name = "xmltobq"
bucket=storage_client.get_bucket(bucket_name)
blobs = list(bucket.list_blobs(prefix="xmlfiles/"))
blob_files = [blob.name for blob in blobs if ".xml" in blob.name]
#Static schema
table_schema = {
"fields": [
{'name' : 'filename', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'CustomerID', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'EmployeeID', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'OrderDate', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'RequiredDate', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipInfo', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [
{'name' : 'ShipVia', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'Freight', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipName', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipAddress', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipCity', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipRegion', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipPostalCode', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShipCountry', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name' : 'ShippedDate', 'type': 'STRING', 'mode': 'NULLABLE'},
]},
]
}
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as p:
def readfiles(element):
'''
Input Pcollection: GCS Element Path
Output Pcollection: (XML, Filename)
'''
# Simple XML conversion using XMLTODICT package
# ToDo: Once specific XML paths are acquired, we can parse only the required fields
import xmltodict
import apache_beam as beam
gcs_file = beam.io.filesystems.FileSystems.open("gs://xmltobq/"+element)
parsed_xml = xmltodict.parse(gcs_file)
return parsed_xml, element.split("/")[1].split(".")[0]
def xmlformatting(element):
'''
Input Pcollection: XML
Output Pcollection: A generator of Modified XML Elements
'''
data, filename = element
for order in data['Root']['Orders']['Order']:
yield formatting(order, filename)
#def tablename(e):
# import re
# return "gcp-bq-2021:dataset4." + re.sub("[\s+,(,)]", "", a)
def formatting(order, filename):
'''
Input Pcollection: (XMLELEMENT, Filename)
Output PCollection: Modified XML
ToDo: This is just to handle the sample xml, production code will be havin different
formatting procress'''
import copy
import re
order_copy = copy.deepcopy(order)
if "@ShippedDate" in order['ShipInfo']:
order_copy['ShipInfo']['ShippedDate'] = order['ShipInfo']['@ShippedDate']
del order_copy['ShipInfo']['@ShippedDate']
order_copy['filename'] = "gcp-bq-2021:testdataset."+re.sub("[\s+,(,)]", "", filename)
return order_copy
# Dynamic table name same as that of input xml file by adding file name as key in the dictionary and accessing
# them in writetobigquery
# ToDo: In Production code dynamic schema option will be included in the Writetobq transform
pipeline_data = (p | "Create GCS Object List" >> beam.Create(blob_files) |
"XMLConversion" >> beam.Map(readfiles) |
"XMLformatting" >> beam.FlatMap(xmlformatting) | "shuffle" >> beam.Reshuffle() |
beam.io.WriteToBigQuery(table=lambda row: row['filename'],
# A lambda function to return dynamic table name,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, #WRITE_TRUNCATE
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
custom_gcs_temp_location="gs://xmltobq"))
ISSUE:
I am able to run the job successfully in the dataflow runner and move the files to Bigquery. But the time it is consuming in the WRITETOBIGQUERY is too long, especially in ParDo(TriggerCopyJobs) step where
the throughput is almost below 1element/second
Instead of a dynamic table if it is a single table the job gets completed lightning fast.
Is there is anything wrong I am doing that is preventing parallel processing.
Machine type used: n1-highcpu-8.
JobID: 2022-03-09_07_28_47-10567887862012507747
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论