数据流流管线错误:" get_message_id'不是定义的。它可以与DirectRunner一起在本地工作,但在DataFlow上不使用

发布于 2025-02-07 09:21:57 字数 5616 浏览 5 评论 0原文

在数据流中运行流媒体管道时,我有一个奇怪的错误。

我已经通过DirectRunner在本地测试了管道,并且它可以正常工作,但是当我在DataFlow上运行它时,会发生这种情况:

<<<<<< img src =“ https://i.sstatic.net/deukh.png” alt =“在此处输入映像说明”>

“在此处输入图像描述”

似乎不是构建get_message_id函数,但是它是实际上...

有什么想法吗?

我缩短了代码只是为了关注这里的相关内容,但是整个工作流程更大。

编辑:

dataflow识别出全球声明的任何功能或常数(在类外部)。在这种情况下,尚未识别的函数是evaluate_filter,因为它首先是执行的。如果删除此功能,则get_message_id函数也会失败。

我正在使用:

  • Python3.8
  • Apache Beam 2.39.0
  • DataFlow Clasic模板

使用此命令创建模板:

python3.8 -m ingestion_pipeline \
--runner DataflowRunner \
--project PROJECT_ID \
--staging_location gs://BUCKET_NAME/staging \
--temp_location gs://BUCKET_NAME/temp \
--template_location gs://BUCKET_NAME/templates/ingestion_pipeline \
--region europe-west1 \
--streaming 

我正在通过cloud shell代码(缩短)

import uuid
import json

from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud import pubsub_v1


# region *********************************  Declare constants  ********************************
BIGQUERY_SCHEMA = 'Publish_time:TIMESTAMP, Payload:STRING, Message_id: STRING'

# endregion *********************************  Declare constants  ************************************


# region *********************************  Input parameters  *********************************
class InputArguments(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        # ------------------- IN/OUT arguments -----------------------
        parser.add_value_provider_argument(
            '--output_table',
            dest='output_table',
            default='xxxxxxxxxxx',
            help='Output table',
        )
        parser.add_value_provider_argument(
            '--output_schema',
            dest='output_schema',
            default=BIGQUERY_SCHEMA,
            help='Output BigQuery Schema in text format',
        )


# endregion *********************************  Input parameters  *********************************


# region *********************************  Aux functions  ************************************
def get_pubsub_topic():

    project_id = "XXXXXXXX"
    topic_id = "XXXXXXXXXXXX"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project_id, topic_id)

    print(topic_path)

    return topic_path


def get_message_id(message_id):
    print('get_message_id function - start')
    message_id = uuid.uuid4() if message_id == '' else message_id
    print('get_message_id function - end')
    return str(message_id)


def evaluate_filter():

    print('evaluate_filter function - start')

    print('evaluate_filter function - end')
    return True

# endregion *********************************  Aux functions  ************************************

# region *********************************  PCollections  *************************************


class CustomParsing(DoFn):
    """
        Parse messages to store the payload in BigQuery.
    """
    def process(self, element):
        """
        Simple processing function to parse the data and add a timestamp
        For additional params see:
        https://beam.apache.org/releases/pydoc/2.7.0/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn
        """

        print('CustomParsing PCollection - start')

        payload = element.data.decode("utf-8")
        publish_time = element.publish_time.timestamp()
        
        evaluate_filter = evaluate_filter()

    
        message_id = get_message_id(element.message_id)

        data_set = {
            'Payload': payload,
            'Publish_time': publish_time,
            'Message_id': message_id
        }

        json_dump = json.dumps(data_set)
        json_data = json.loads(json_dump)

        print('CustomParsing PCollection - end')
        yield json_data


# endregion *********************************  PCollections  **********************************

def run(save_main_session=True):
    """
    Build and run Pipeline
    """
    pipeline_options = PipelineOptions(
        streaming=True
    )
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    pubsub_topic = get_pubsub_topic()

    # --------------------------- Run pipeline ----------------------------------
    with Pipeline(options=pipeline_options) as pipeline:
        input_args = pipeline_options.view_as(InputArguments)
        rows = (pipeline | 'Read from Pub/Sub' >> io.ReadFromPubSub(pubsub_topic, with_attributes=True))

        write_bq = (
                rows
                | 'Parse messages' >> ParDo(CustomParsing())
                | "Write to BigQuery" >> io.WriteToBigQuery(input_args.output_table,
                                                            input_args.output_schema,
                                                            write_disposition=io.BigQueryDisposition.WRITE_APPEND)
        )


if __name__ == '__main__':
    run()

I'm having a weird error when running an STREAMING pipeline in Dataflow.

I have tested the pipeline locally through DirectRunner and it works properly, but when I run it on Dataflow this happens:

enter image description here

enter image description here

Seems like it's not founding the get_message_id function, but it's actually there...

Any thoughts?

I have shorten the code just to focus on the relevant things here, but the entire workflow is much more bigger.

EDIT:

Any function or constant declared globally (outside the Classes) is not recognized by Dataflow. In this case, the function not recognized is evaluate_filter because it's executed in the first place. If I delete this function, the get_message_id function fails too.

I'm using:

  • Python3.8
  • Apache Beam 2.39.0
  • Dataflow clasic template

I'm creating the template with this command through Cloud Shell

python3.8 -m ingestion_pipeline \
--runner DataflowRunner \
--project PROJECT_ID \
--staging_location gs://BUCKET_NAME/staging \
--temp_location gs://BUCKET_NAME/temp \
--template_location gs://BUCKET_NAME/templates/ingestion_pipeline \
--region europe-west1 \
--streaming 

Code (Shortened):

import uuid
import json

from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud import pubsub_v1


# region *********************************  Declare constants  ********************************
BIGQUERY_SCHEMA = 'Publish_time:TIMESTAMP, Payload:STRING, Message_id: STRING'

# endregion *********************************  Declare constants  ************************************


# region *********************************  Input parameters  *********************************
class InputArguments(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        # ------------------- IN/OUT arguments -----------------------
        parser.add_value_provider_argument(
            '--output_table',
            dest='output_table',
            default='xxxxxxxxxxx',
            help='Output table',
        )
        parser.add_value_provider_argument(
            '--output_schema',
            dest='output_schema',
            default=BIGQUERY_SCHEMA,
            help='Output BigQuery Schema in text format',
        )


# endregion *********************************  Input parameters  *********************************


# region *********************************  Aux functions  ************************************
def get_pubsub_topic():

    project_id = "XXXXXXXX"
    topic_id = "XXXXXXXXXXXX"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project_id, topic_id)

    print(topic_path)

    return topic_path


def get_message_id(message_id):
    print('get_message_id function - start')
    message_id = uuid.uuid4() if message_id == '' else message_id
    print('get_message_id function - end')
    return str(message_id)


def evaluate_filter():

    print('evaluate_filter function - start')

    print('evaluate_filter function - end')
    return True

# endregion *********************************  Aux functions  ************************************

# region *********************************  PCollections  *************************************


class CustomParsing(DoFn):
    """
        Parse messages to store the payload in BigQuery.
    """
    def process(self, element):
        """
        Simple processing function to parse the data and add a timestamp
        For additional params see:
        https://beam.apache.org/releases/pydoc/2.7.0/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn
        """

        print('CustomParsing PCollection - start')

        payload = element.data.decode("utf-8")
        publish_time = element.publish_time.timestamp()
        
        evaluate_filter = evaluate_filter()

    
        message_id = get_message_id(element.message_id)

        data_set = {
            'Payload': payload,
            'Publish_time': publish_time,
            'Message_id': message_id
        }

        json_dump = json.dumps(data_set)
        json_data = json.loads(json_dump)

        print('CustomParsing PCollection - end')
        yield json_data


# endregion *********************************  PCollections  **********************************

def run(save_main_session=True):
    """
    Build and run Pipeline
    """
    pipeline_options = PipelineOptions(
        streaming=True
    )
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

    pubsub_topic = get_pubsub_topic()

    # --------------------------- Run pipeline ----------------------------------
    with Pipeline(options=pipeline_options) as pipeline:
        input_args = pipeline_options.view_as(InputArguments)
        rows = (pipeline | 'Read from Pub/Sub' >> io.ReadFromPubSub(pubsub_topic, with_attributes=True))

        write_bq = (
                rows
                | 'Parse messages' >> ParDo(CustomParsing())
                | "Write to BigQuery" >> io.WriteToBigQuery(input_args.output_table,
                                                            input_args.output_schema,
                                                            write_disposition=io.BigQueryDisposition.WRITE_APPEND)
        )


if __name__ == '__main__':
    run()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

久而酒知 2025-02-14 09:21:57

这适用于我的数据流管线:

将外部方法和自定义DOFN放入单独的文件中,然后将前者导入后者。接下来,将DOFN导入主文件中并使用setup.py(在此处描述的)用于部署数据流。

不要忘记将一个空的__ init __。

This works for my dataflow pipeline:

Put the external method and the custom DoFn into separate files and import the former into the latter. Next, import the DoFn into your main file and use a setup.py (described here) for the deployment of dataflow.

Don't forget to put an empty __init__.py within your other_files_dir/

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文