使用动态表目标功能将 XML 数据流传输至 Bigquery(面临延迟问题)

发布于 2025-01-12 21:42:03 字数 5621 浏览 3 评论 0原文

我是数据流新手,并遇到了以下问题。

问题陈述:需要数据流作业 (Python) 将 XML 从 GCS 加载到 Bigquery(批量加载)。 Bigquery 中的目标表是动态的,并在运行时根据 XML 文件名进行计算。

解决方案已确定:只需按照文章进行操作 - https://medium.com/google-cloud/how-to-load-xml-data-into-bigquery-using-python-dataflow-fd1580e4af48。其中BIGQUERYWRITE转换中使用了静态表,但我使用通过可调用函数获得的动态表名称。(附上代码参考)

JOB Graph职位图

代码

import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage

# List the gcs file objects
# ToDo: Use Apache Beam GCSIO Match Patterns To list the file objects
storage_client = storage.Client()
bucket_name = "xmltobq"
bucket=storage_client.get_bucket(bucket_name)
blobs = list(bucket.list_blobs(prefix="xmlfiles/"))
blob_files = [blob.name for blob in blobs if ".xml" in blob.name]

#Static schema
table_schema = {
"fields": [
    {'name' : 'filename', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name' : 'CustomerID', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name' : 'EmployeeID', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name' : 'OrderDate', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name' : 'RequiredDate', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name' : 'ShipInfo', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [
        {'name' : 'ShipVia', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name' : 'Freight', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name' : 'ShipName', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name' : 'ShipAddress', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name' : 'ShipCity', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name' : 'ShipRegion', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name' : 'ShipPostalCode', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name' : 'ShipCountry', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name' : 'ShippedDate', 'type': 'STRING', 'mode': 'NULLABLE'},
    ]},
]
}

def run(argv=None, save_main_session=True):
 parser = argparse.ArgumentParser()
 known_args, pipeline_args = parser.parse_known_args(argv)
 pipeline_options = PipelineOptions(pipeline_args)
 with beam.Pipeline(options=pipeline_options) as p:

  def readfiles(element):
    '''
    Input Pcollection: GCS Element Path
    Output Pcollection: (XML, Filename)
    '''
    # Simple XML conversion using XMLTODICT package
    # ToDo: Once specific XML paths are acquired, we can parse only the required fields
    import xmltodict
    import apache_beam as beam
    gcs_file = beam.io.filesystems.FileSystems.open("gs://xmltobq/"+element)
    parsed_xml = xmltodict.parse(gcs_file)
    return parsed_xml, element.split("/")[1].split(".")[0]

  def xmlformatting(element):
    '''
    Input Pcollection: XML
    Output Pcollection: A generator of Modified XML Elements
    '''
    data, filename = element
    for order in data['Root']['Orders']['Order']:
        yield formatting(order, filename)
        
   #def tablename(e):
  #     import re
  #    return "gcp-bq-2021:dataset4." + re.sub("[\s+,(,)]", "", a)

  def formatting(order, filename):
    '''
    Input Pcollection: (XMLELEMENT, Filename)
    Output PCollection: Modified XML
    ToDo: This is just to handle the sample xml, production code will be havin different
    formatting procress'''
    import copy
    import re
    order_copy = copy.deepcopy(order)
    if "@ShippedDate" in order['ShipInfo']:
        order_copy['ShipInfo']['ShippedDate'] = order['ShipInfo']['@ShippedDate']
        del order_copy['ShipInfo']['@ShippedDate']
    order_copy['filename'] = "gcp-bq-2021:testdataset."+re.sub("[\s+,(,)]", "", filename)
    return order_copy


  # Dynamic table name same as that of input xml file by adding file name as key in the dictionary and accessing
  # them in writetobigquery
  # ToDo: In Production code dynamic schema option will be included in the   Writetobq transform
  pipeline_data = (p | "Create GCS Object List" >> beam.Create(blob_files) | 
                "XMLConversion" >> beam.Map(readfiles) |
                "XMLformatting" >> beam.FlatMap(xmlformatting) | "shuffle" >> beam.Reshuffle() |
             beam.io.WriteToBigQuery(table=lambda row: row['filename'], 
# A lambda function to return dynamic table name,
                                   schema=table_schema,
                                   
  write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, #WRITE_TRUNCATE
                                   
  create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                   custom_gcs_temp_location="gs://xmltobq"))

示例 XML 文件示例 XML

问题: 我能够在数据流运行程序中成功运行作业并将文件移动到 Bigquery。但是 WRITETOBIGQUERY 消耗的时间太长,尤其是在 ParDo(TriggerCopyJobs) 步骤中 吞吐量几乎低于 1 个元素/秒

如果它是单个表,而不是动态表,则工作可以快速完成。

输出吞吐量

我做的任何错误是否会阻止并行处理。

使用的机器类型:n1-highcpu-8。

职位ID:2022-03-09_07_28_47-10567887862012507747

I am new to Dataflow and got stuck with the below issue.

Problem statement: Need a Dataflow job(Python) to load XML from GCS into Bigquery (Batch Load). The Destination table in Bigquery is dynamic and calculated at the run time based on the XML file name.

Solution Decided: Just followed the article - https://medium.com/google-cloud/how-to-load-xml-data-into-bigquery-using-python-dataflow-fd1580e4af48. Wherein there static table was used in BIGQUERYWRITE transform, but I am using dynamic table name obtained via a callable function.(Attaching the code reference)

JOB Graph:
JOB GRAPH

Code:

import apache_beam as beam
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage

# List the gcs file objects
# ToDo: Use Apache Beam GCSIO Match Patterns To list the file objects
storage_client = storage.Client()
bucket_name = "xmltobq"
bucket=storage_client.get_bucket(bucket_name)
blobs = list(bucket.list_blobs(prefix="xmlfiles/"))
blob_files = [blob.name for blob in blobs if ".xml" in blob.name]

#Static schema
table_schema = {
"fields": [
    {'name' : 'filename', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name' : 'CustomerID', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name' : 'EmployeeID', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name' : 'OrderDate', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name' : 'RequiredDate', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name' : 'ShipInfo', 'type': 'RECORD', 'mode': 'NULLABLE', 'fields': [
        {'name' : 'ShipVia', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name' : 'Freight', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name' : 'ShipName', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name' : 'ShipAddress', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name' : 'ShipCity', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name' : 'ShipRegion', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name' : 'ShipPostalCode', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name' : 'ShipCountry', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name' : 'ShippedDate', 'type': 'STRING', 'mode': 'NULLABLE'},
    ]},
]
}

def run(argv=None, save_main_session=True):
 parser = argparse.ArgumentParser()
 known_args, pipeline_args = parser.parse_known_args(argv)
 pipeline_options = PipelineOptions(pipeline_args)
 with beam.Pipeline(options=pipeline_options) as p:

  def readfiles(element):
    '''
    Input Pcollection: GCS Element Path
    Output Pcollection: (XML, Filename)
    '''
    # Simple XML conversion using XMLTODICT package
    # ToDo: Once specific XML paths are acquired, we can parse only the required fields
    import xmltodict
    import apache_beam as beam
    gcs_file = beam.io.filesystems.FileSystems.open("gs://xmltobq/"+element)
    parsed_xml = xmltodict.parse(gcs_file)
    return parsed_xml, element.split("/")[1].split(".")[0]

  def xmlformatting(element):
    '''
    Input Pcollection: XML
    Output Pcollection: A generator of Modified XML Elements
    '''
    data, filename = element
    for order in data['Root']['Orders']['Order']:
        yield formatting(order, filename)
        
   #def tablename(e):
  #     import re
  #    return "gcp-bq-2021:dataset4." + re.sub("[\s+,(,)]", "", a)

  def formatting(order, filename):
    '''
    Input Pcollection: (XMLELEMENT, Filename)
    Output PCollection: Modified XML
    ToDo: This is just to handle the sample xml, production code will be havin different
    formatting procress'''
    import copy
    import re
    order_copy = copy.deepcopy(order)
    if "@ShippedDate" in order['ShipInfo']:
        order_copy['ShipInfo']['ShippedDate'] = order['ShipInfo']['@ShippedDate']
        del order_copy['ShipInfo']['@ShippedDate']
    order_copy['filename'] = "gcp-bq-2021:testdataset."+re.sub("[\s+,(,)]", "", filename)
    return order_copy


  # Dynamic table name same as that of input xml file by adding file name as key in the dictionary and accessing
  # them in writetobigquery
  # ToDo: In Production code dynamic schema option will be included in the   Writetobq transform
  pipeline_data = (p | "Create GCS Object List" >> beam.Create(blob_files) | 
                "XMLConversion" >> beam.Map(readfiles) |
                "XMLformatting" >> beam.FlatMap(xmlformatting) | "shuffle" >> beam.Reshuffle() |
             beam.io.WriteToBigQuery(table=lambda row: row['filename'], 
# A lambda function to return dynamic table name,
                                   schema=table_schema,
                                   
  write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, #WRITE_TRUNCATE
                                   
  create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                   custom_gcs_temp_location="gs://xmltobq"))

Sample XML File:
Sample XML

ISSUE:
I am able to run the job successfully in the dataflow runner and move the files to Bigquery. But the time it is consuming in the WRITETOBIGQUERY is too long, especially in ParDo(TriggerCopyJobs) step where
the throughput is almost below 1element/second

Instead of a dynamic table if it is a single table the job gets completed lightning fast.

output throughput

Is there is anything wrong I am doing that is preventing parallel processing.

Machine type used: n1-highcpu-8.

JobID: 2022-03-09_07_28_47-10567887862012507747

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文