数据流代码无法使用 Dataflow Runner 执行,但可以在 DirectRunner 上正常运行
我的输入文件中有大约 20 列,但在输出文件(structural_cols)中,我只需要在 structured_cols_header 中指定的 10 列。应在输出文件中选择名称与 Structured_headers 中的列名称匹配的输入列。
但是代码在 DirectRunner 上运行良好,但在 DataFlowRunner 上运行不正常
import apache_beam as beam
import pandas as pd
import csv
import io
import IPython
class ComputeWordLengthFn(beam.DoFn):
def process(self,readable_file):
gcs_file = beam.io.filesystems.FileSystems.open(readable_file)
# Read it as csv, you can also use csv.reader
csv_dict = csv.DictReader(io.TextIOWrapper(gcs_file))
# Create the DataFrame
df = pd.DataFrame(csv_dict)
input_zone_header = list(df.columns.values)
Structured_zone_header = ['ORDER_NO', 'ORDER_DATE', 'EFFECTIVE_ORDER_DATE', 'NODE', 'ORDER_STATUS', 'ORDER_TYPE', 'SERVICE_LEVEL',
'ORDER_QUANTITY', 'CUSTOMER_CITY', 'CUSTOMER_STATE', 'CUSTOMER_COUNTRY', 'CUSTOMER_ZIP_CODE', 'PROMISE_DATE', 'SHIP_DATE', 'DELIVERY_DATE', 'CARRIER', 'CARRIER_SERVICE_CODE',
'TRACKING_NO', 'SHIP_VIA']
structured_cols = df.loc[:,df.columns.isin(list(Structured_zone_header))]
print(structured_cols)
rows = structured_cols.to_csv('gs://flc-raw-zone/processed/Base_Data.csv',index=False)
yield rows
# print(dataFrame.to_string())
def run(project, source_bucket, target_bucket):
import csv
import apache_beam as beam
options = {
'staging_location': 'gs://flc_df_stg_folders/staging',
'temp_location': 'gs://flc_df_temp_folders/temp',
'job_name': 'dataflow-flc-gcs-gcs',
'region': 'us-west1',
'project': 'sandbox-tredence',
'max_num_workers': 24,
'runner': 'DataflowRunner',
'save_main_session': 'True'
}
options = beam.pipeline.PipelineOptions(flags=[], **options)
input_data = 'gs://{}/input/Base_data.csv'.format(source_bucket)
processed_ds = 'gs://{}/processed/transformed-Base_data'.format(target_bucket)
pipeline = beam.Pipeline(options=options)
rows = (
pipeline |
beam.Create([input_data])
| beam.ParDo(ComputeWordLengthFn())
)
pipeline.run()
if __name__ == '__main__':
print ('Run pipeline on the cloud')
run(project='sandbox-tredence', source_bucket='flc-raw-zone', target_bucket='flc-raw-zone')
我得到的错误读取为
ERROR :
Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 770, in run self._load_main_session(self.local_staging_directory) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 508, in _load_main_session pickler.load_session(session_file) File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 65, in load_session return desired_pickle_lib.load_session(file_path) File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/dill_pickler.py", line 313, in load_session return dill.load_session(file_path) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 368, in load_session module = unpickler.load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 462, in find_class return StockUnpickler.find_class(self, module, name) ModuleNotFoundError: No module named 'IPython'
2) S01:Create/Read+ParDo(ComputeWordLengthFn) failed., Internal Issue (351826dbea8c178f): 63963027:24514
I have around 20 Columns in my input file but in the output file(structured_cols) I need only 10 columns which I have specified in the strutured_cols_header.Those input columns whose name matches with the cols name in the structured_headers should be selected in the output file.
But the code runs fine with the DirectRunner but not running with DataFlowRunner
import apache_beam as beam
import pandas as pd
import csv
import io
import IPython
class ComputeWordLengthFn(beam.DoFn):
def process(self,readable_file):
gcs_file = beam.io.filesystems.FileSystems.open(readable_file)
# Read it as csv, you can also use csv.reader
csv_dict = csv.DictReader(io.TextIOWrapper(gcs_file))
# Create the DataFrame
df = pd.DataFrame(csv_dict)
input_zone_header = list(df.columns.values)
Structured_zone_header = ['ORDER_NO', 'ORDER_DATE', 'EFFECTIVE_ORDER_DATE', 'NODE', 'ORDER_STATUS', 'ORDER_TYPE', 'SERVICE_LEVEL',
'ORDER_QUANTITY', 'CUSTOMER_CITY', 'CUSTOMER_STATE', 'CUSTOMER_COUNTRY', 'CUSTOMER_ZIP_CODE', 'PROMISE_DATE', 'SHIP_DATE', 'DELIVERY_DATE', 'CARRIER', 'CARRIER_SERVICE_CODE',
'TRACKING_NO', 'SHIP_VIA']
structured_cols = df.loc[:,df.columns.isin(list(Structured_zone_header))]
print(structured_cols)
rows = structured_cols.to_csv('gs://flc-raw-zone/processed/Base_Data.csv',index=False)
yield rows
# print(dataFrame.to_string())
def run(project, source_bucket, target_bucket):
import csv
import apache_beam as beam
options = {
'staging_location': 'gs://flc_df_stg_folders/staging',
'temp_location': 'gs://flc_df_temp_folders/temp',
'job_name': 'dataflow-flc-gcs-gcs',
'region': 'us-west1',
'project': 'sandbox-tredence',
'max_num_workers': 24,
'runner': 'DataflowRunner',
'save_main_session': 'True'
}
options = beam.pipeline.PipelineOptions(flags=[], **options)
input_data = 'gs://{}/input/Base_data.csv'.format(source_bucket)
processed_ds = 'gs://{}/processed/transformed-Base_data'.format(target_bucket)
pipeline = beam.Pipeline(options=options)
rows = (
pipeline |
beam.Create([input_data])
| beam.ParDo(ComputeWordLengthFn())
)
pipeline.run()
if __name__ == '__main__':
print ('Run pipeline on the cloud')
run(project='sandbox-tredence', source_bucket='flc-raw-zone', target_bucket='flc-raw-zone')
The error that I get back reads as
ERROR :
Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 770, in run self._load_main_session(self.local_staging_directory) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 508, in _load_main_session pickler.load_session(session_file) File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 65, in load_session return desired_pickle_lib.load_session(file_path) File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/dill_pickler.py", line 313, in load_session return dill.load_session(file_path) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 368, in load_session module = unpickler.load() File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load obj = StockUnpickler.load(self) File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 462, in find_class return StockUnpickler.find_class(self, module, name) ModuleNotFoundError: No module named 'IPython'
2) S01:Create/Read+ParDo(ComputeWordLengthFn) failed., Internal Issue (351826dbea8c178f): 63963027:24514
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论