数据流流管线错误:" get_message_id'不是定义的。它可以与DirectRunner一起在本地工作,但在DataFlow上不使用
在数据流中运行流媒体管道时,我有一个奇怪的错误。
我已经通过DirectRunner在本地测试了管道,并且它可以正常工作,但是当我在DataFlow上运行它时,会发生这种情况:
<<<<<< img src =“ https://i.sstatic.net/deukh.png” alt =“在此处输入映像说明”>
似乎不是构建get_message_id函数,但是它是实际上...
有什么想法吗?
我缩短了代码只是为了关注这里的相关内容,但是整个工作流程更大。
编辑:
dataflow识别出全球声明的任何功能或常数(在类外部)。在这种情况下,尚未识别的函数是evaluate_filter,因为它首先是执行的。如果删除此功能,则get_message_id函数也会失败。
我正在使用:
- Python3.8
- Apache Beam 2.39.0
- DataFlow Clasic模板
使用此命令创建模板:
python3.8 -m ingestion_pipeline \
--runner DataflowRunner \
--project PROJECT_ID \
--staging_location gs://BUCKET_NAME/staging \
--temp_location gs://BUCKET_NAME/temp \
--template_location gs://BUCKET_NAME/templates/ingestion_pipeline \
--region europe-west1 \
--streaming
我正在通过cloud shell代码(缩短)
import uuid
import json
from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud import pubsub_v1
# region ********************************* Declare constants ********************************
BIGQUERY_SCHEMA = 'Publish_time:TIMESTAMP, Payload:STRING, Message_id: STRING'
# endregion ********************************* Declare constants ************************************
# region ********************************* Input parameters *********************************
class InputArguments(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
# ------------------- IN/OUT arguments -----------------------
parser.add_value_provider_argument(
'--output_table',
dest='output_table',
default='xxxxxxxxxxx',
help='Output table',
)
parser.add_value_provider_argument(
'--output_schema',
dest='output_schema',
default=BIGQUERY_SCHEMA,
help='Output BigQuery Schema in text format',
)
# endregion ********************************* Input parameters *********************************
# region ********************************* Aux functions ************************************
def get_pubsub_topic():
project_id = "XXXXXXXX"
topic_id = "XXXXXXXXXXXX"
publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)
print(topic_path)
return topic_path
def get_message_id(message_id):
print('get_message_id function - start')
message_id = uuid.uuid4() if message_id == '' else message_id
print('get_message_id function - end')
return str(message_id)
def evaluate_filter():
print('evaluate_filter function - start')
print('evaluate_filter function - end')
return True
# endregion ********************************* Aux functions ************************************
# region ********************************* PCollections *************************************
class CustomParsing(DoFn):
"""
Parse messages to store the payload in BigQuery.
"""
def process(self, element):
"""
Simple processing function to parse the data and add a timestamp
For additional params see:
https://beam.apache.org/releases/pydoc/2.7.0/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn
"""
print('CustomParsing PCollection - start')
payload = element.data.decode("utf-8")
publish_time = element.publish_time.timestamp()
evaluate_filter = evaluate_filter()
message_id = get_message_id(element.message_id)
data_set = {
'Payload': payload,
'Publish_time': publish_time,
'Message_id': message_id
}
json_dump = json.dumps(data_set)
json_data = json.loads(json_dump)
print('CustomParsing PCollection - end')
yield json_data
# endregion ********************************* PCollections **********************************
def run(save_main_session=True):
"""
Build and run Pipeline
"""
pipeline_options = PipelineOptions(
streaming=True
)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
pubsub_topic = get_pubsub_topic()
# --------------------------- Run pipeline ----------------------------------
with Pipeline(options=pipeline_options) as pipeline:
input_args = pipeline_options.view_as(InputArguments)
rows = (pipeline | 'Read from Pub/Sub' >> io.ReadFromPubSub(pubsub_topic, with_attributes=True))
write_bq = (
rows
| 'Parse messages' >> ParDo(CustomParsing())
| "Write to BigQuery" >> io.WriteToBigQuery(input_args.output_table,
input_args.output_schema,
write_disposition=io.BigQueryDisposition.WRITE_APPEND)
)
if __name__ == '__main__':
run()
I'm having a weird error when running an STREAMING pipeline in Dataflow.
I have tested the pipeline locally through DirectRunner and it works properly, but when I run it on Dataflow this happens:
Seems like it's not founding the get_message_id function, but it's actually there...
Any thoughts?
I have shorten the code just to focus on the relevant things here, but the entire workflow is much more bigger.
EDIT:
Any function or constant declared globally (outside the Classes) is not recognized by Dataflow. In this case, the function not recognized is evaluate_filter because it's executed in the first place. If I delete this function, the get_message_id function fails too.
I'm using:
- Python3.8
- Apache Beam 2.39.0
- Dataflow clasic template
I'm creating the template with this command through Cloud Shell
python3.8 -m ingestion_pipeline \
--runner DataflowRunner \
--project PROJECT_ID \
--staging_location gs://BUCKET_NAME/staging \
--temp_location gs://BUCKET_NAME/temp \
--template_location gs://BUCKET_NAME/templates/ingestion_pipeline \
--region europe-west1 \
--streaming
Code (Shortened):
import uuid
import json
from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud import pubsub_v1
# region ********************************* Declare constants ********************************
BIGQUERY_SCHEMA = 'Publish_time:TIMESTAMP, Payload:STRING, Message_id: STRING'
# endregion ********************************* Declare constants ************************************
# region ********************************* Input parameters *********************************
class InputArguments(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
# ------------------- IN/OUT arguments -----------------------
parser.add_value_provider_argument(
'--output_table',
dest='output_table',
default='xxxxxxxxxxx',
help='Output table',
)
parser.add_value_provider_argument(
'--output_schema',
dest='output_schema',
default=BIGQUERY_SCHEMA,
help='Output BigQuery Schema in text format',
)
# endregion ********************************* Input parameters *********************************
# region ********************************* Aux functions ************************************
def get_pubsub_topic():
project_id = "XXXXXXXX"
topic_id = "XXXXXXXXXXXX"
publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)
print(topic_path)
return topic_path
def get_message_id(message_id):
print('get_message_id function - start')
message_id = uuid.uuid4() if message_id == '' else message_id
print('get_message_id function - end')
return str(message_id)
def evaluate_filter():
print('evaluate_filter function - start')
print('evaluate_filter function - end')
return True
# endregion ********************************* Aux functions ************************************
# region ********************************* PCollections *************************************
class CustomParsing(DoFn):
"""
Parse messages to store the payload in BigQuery.
"""
def process(self, element):
"""
Simple processing function to parse the data and add a timestamp
For additional params see:
https://beam.apache.org/releases/pydoc/2.7.0/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn
"""
print('CustomParsing PCollection - start')
payload = element.data.decode("utf-8")
publish_time = element.publish_time.timestamp()
evaluate_filter = evaluate_filter()
message_id = get_message_id(element.message_id)
data_set = {
'Payload': payload,
'Publish_time': publish_time,
'Message_id': message_id
}
json_dump = json.dumps(data_set)
json_data = json.loads(json_dump)
print('CustomParsing PCollection - end')
yield json_data
# endregion ********************************* PCollections **********************************
def run(save_main_session=True):
"""
Build and run Pipeline
"""
pipeline_options = PipelineOptions(
streaming=True
)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
pubsub_topic = get_pubsub_topic()
# --------------------------- Run pipeline ----------------------------------
with Pipeline(options=pipeline_options) as pipeline:
input_args = pipeline_options.view_as(InputArguments)
rows = (pipeline | 'Read from Pub/Sub' >> io.ReadFromPubSub(pubsub_topic, with_attributes=True))
write_bq = (
rows
| 'Parse messages' >> ParDo(CustomParsing())
| "Write to BigQuery" >> io.WriteToBigQuery(input_args.output_table,
input_args.output_schema,
write_disposition=io.BigQueryDisposition.WRITE_APPEND)
)
if __name__ == '__main__':
run()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这适用于我的数据流管线:
将外部方法和自定义DOFN放入单独的文件中,然后将前者导入后者。接下来,将DOFN导入主文件中并使用setup.py(在此处描述的)用于部署数据流。
不要忘记将一个空的
__ init __。
This works for my dataflow pipeline:
Put the external method and the custom DoFn into separate files and import the former into the latter. Next, import the DoFn into your main file and use a setup.py (described here) for the deployment of dataflow.
Don't forget to put an empty
__init__.py
within yourother_files_dir/