数据流代码无法使用 Dataflow Runner 执行,但可以在 DirectRunner 上正常运行

发布于 2025-01-09 16:26:49 字数 3615 浏览 2 评论 0原文

我的输入文件中有大约 20 列,但在输出文件(structural_cols)中,我只需要在 structured_cols_header 中指定的 10 列。应在输出文件中选择名称与 Structured_headers 中的列名称匹配的输入列。

但是代码在 DirectRunner 上运行良好,但在 DataFlowRunner 上运行不正常

import apache_beam as beam
import pandas as pd
import csv
import io
import IPython

class ComputeWordLengthFn(beam.DoFn):
    def process(self,readable_file):
        gcs_file = beam.io.filesystems.FileSystems.open(readable_file)

    # Read it as csv, you can also use csv.reader
        csv_dict = csv.DictReader(io.TextIOWrapper(gcs_file))

    # Create the DataFrame
        df = pd.DataFrame(csv_dict)
        input_zone_header = list(df.columns.values)
        Structured_zone_header = ['ORDER_NO', 'ORDER_DATE', 'EFFECTIVE_ORDER_DATE', 'NODE', 'ORDER_STATUS', 'ORDER_TYPE', 'SERVICE_LEVEL', 
    'ORDER_QUANTITY', 'CUSTOMER_CITY', 'CUSTOMER_STATE', 'CUSTOMER_COUNTRY', 'CUSTOMER_ZIP_CODE', 'PROMISE_DATE', 'SHIP_DATE', 'DELIVERY_DATE', 'CARRIER', 'CARRIER_SERVICE_CODE',
    'TRACKING_NO', 'SHIP_VIA']
        structured_cols = df.loc[:,df.columns.isin(list(Structured_zone_header))]
        print(structured_cols)
        rows = structured_cols.to_csv('gs://flc-raw-zone/processed/Base_Data.csv',index=False)
        yield rows
   # print(dataFrame.to_string())

def run(project, source_bucket, target_bucket):
    import csv
    import apache_beam as beam
    options = {
        'staging_location': 'gs://flc_df_stg_folders/staging',
        'temp_location': 'gs://flc_df_temp_folders/temp',
        'job_name': 'dataflow-flc-gcs-gcs',
        'region': 'us-west1',
        'project': 'sandbox-tredence',
        'max_num_workers': 24,
        'runner': 'DataflowRunner',
        'save_main_session': 'True'
      }
    options = beam.pipeline.PipelineOptions(flags=[], **options)
    
    input_data = 'gs://{}/input/Base_data.csv'.format(source_bucket)
    processed_ds = 'gs://{}/processed/transformed-Base_data'.format(target_bucket)

    pipeline = beam.Pipeline(options=options)


    rows = (
        pipeline |
            beam.Create([input_data])
            | beam.ParDo(ComputeWordLengthFn())
    )

    pipeline.run()


if __name__ == '__main__':
    print ('Run pipeline on the cloud')
    run(project='sandbox-tredence', source_bucket='flc-raw-zone', target_bucket='flc-raw-zone')

我得到的错误读取为

ERROR :
Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 770, in run self._load_main_session(self.local_staging_directory) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 508, in _load_main_session pickler.load_session(session_file) File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 65, in load_session return desired_pickle_lib.load_session(file_path) File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/dill_pickler.py", line 313, in load_session return dill.load_session(file_path) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 368, in load_session module = unpickler.load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 462, in find_class return StockUnpickler.find_class(self, module, name) ModuleNotFoundError: No module named 'IPython'


2) S01:Create/Read+ParDo(ComputeWordLengthFn) failed., Internal Issue (351826dbea8c178f): 63963027:24514

I have around 20 Columns in my input file but in the output file(structured_cols) I need only 10 columns which I have specified in the strutured_cols_header.Those input columns whose name matches with the cols name in the structured_headers should be selected in the output file.

But the code runs fine with the DirectRunner but not running with DataFlowRunner

import apache_beam as beam
import pandas as pd
import csv
import io
import IPython

class ComputeWordLengthFn(beam.DoFn):
    def process(self,readable_file):
        gcs_file = beam.io.filesystems.FileSystems.open(readable_file)

    # Read it as csv, you can also use csv.reader
        csv_dict = csv.DictReader(io.TextIOWrapper(gcs_file))

    # Create the DataFrame
        df = pd.DataFrame(csv_dict)
        input_zone_header = list(df.columns.values)
        Structured_zone_header = ['ORDER_NO', 'ORDER_DATE', 'EFFECTIVE_ORDER_DATE', 'NODE', 'ORDER_STATUS', 'ORDER_TYPE', 'SERVICE_LEVEL', 
    'ORDER_QUANTITY', 'CUSTOMER_CITY', 'CUSTOMER_STATE', 'CUSTOMER_COUNTRY', 'CUSTOMER_ZIP_CODE', 'PROMISE_DATE', 'SHIP_DATE', 'DELIVERY_DATE', 'CARRIER', 'CARRIER_SERVICE_CODE',
    'TRACKING_NO', 'SHIP_VIA']
        structured_cols = df.loc[:,df.columns.isin(list(Structured_zone_header))]
        print(structured_cols)
        rows = structured_cols.to_csv('gs://flc-raw-zone/processed/Base_Data.csv',index=False)
        yield rows
   # print(dataFrame.to_string())

def run(project, source_bucket, target_bucket):
    import csv
    import apache_beam as beam
    options = {
        'staging_location': 'gs://flc_df_stg_folders/staging',
        'temp_location': 'gs://flc_df_temp_folders/temp',
        'job_name': 'dataflow-flc-gcs-gcs',
        'region': 'us-west1',
        'project': 'sandbox-tredence',
        'max_num_workers': 24,
        'runner': 'DataflowRunner',
        'save_main_session': 'True'
      }
    options = beam.pipeline.PipelineOptions(flags=[], **options)
    
    input_data = 'gs://{}/input/Base_data.csv'.format(source_bucket)
    processed_ds = 'gs://{}/processed/transformed-Base_data'.format(target_bucket)

    pipeline = beam.Pipeline(options=options)


    rows = (
        pipeline |
            beam.Create([input_data])
            | beam.ParDo(ComputeWordLengthFn())
    )

    pipeline.run()


if __name__ == '__main__':
    print ('Run pipeline on the cloud')
    run(project='sandbox-tredence', source_bucket='flc-raw-zone', target_bucket='flc-raw-zone')

The error that I get back reads as

ERROR :
Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 770, in run self._load_main_session(self.local_staging_directory) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 508, in _load_main_session pickler.load_session(session_file) File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 65, in load_session return desired_pickle_lib.load_session(file_path) File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/dill_pickler.py", line 313, in load_session return dill.load_session(file_path) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 368, in load_session module = unpickler.load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 462, in find_class return StockUnpickler.find_class(self, module, name) ModuleNotFoundError: No module named 'IPython'


2) S01:Create/Read+ParDo(ComputeWordLengthFn) failed., Internal Issue (351826dbea8c178f): 63963027:24514

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文