如何进行零停机时间升级和低延迟流GCP数据流?

发布于 2025-01-21 14:05:34 字数 1565 浏览 3 评论 0原文

我已经设置了用Python编写的自定义数据流程作业,该作业仅将图像(由PubSub消息Triggerd)从一个存储桶复制到另一个,然后发送HTTP请求。

它看起来像:

    messages = pipeline | 'On Pubsub Message' >> ReadFromPubSub(topic=config['pubsub_image_topic'])
    new_images = messages | 'Parse Pubsub Message' >> apache_beam.Map(parse_pubsub)
    valid_images = new_images | 'Attach Metadata' >> apache_beam.ParDo(AttachMetadata())
    collected_images = valid_images | 'Collect Image' >> apache_beam.Map(collect_image)
    copied_images = collected_images | 'Copy To Bucket' >> apache_beam.ParDo(CopyToBucketDoFun())
    data_bucket_images | 'Notify WebHook' >> apache_beam.ParDo(NotifyWebHookFn())

我在GCP数据流中运行它,例如以下参数以下参数,

python ./main.py \
  --job_name=$JOB_NAME \
  --streaming \
  --enable_streaming_engine \
  --runner="DataflowRunner" \
  --project=$GCP_PROJECT \
  --region=$REGION \
  --subnetwork=$SUBNETWORK \
  --staging_location=$STAGING_LOCATION \
  --no_use_public_ips \
  --setup_file ./setup.py \
  --service_account_email "${SERVICE_ACCOUNT}" \
  --requirements_file "requirements.txt" \
  --pubsub_image_topic $PUBSUB_IMAGE_TOPIC \
  --update

可以找到使用该名称的旧作业,然后停止它,复制状态,然后启动此作业并恢复状态。但是,这会导致9分钟的停机时间,而旧工作被停止,并且旧的工作正在初始化。是否有任何方法可以保持旧作业的运行,直到新作品准备好处理?

另外,我希望这条管道以最小的延迟运行。但是,即使单个消息的所有步骤都应花费不到半秒钟,该管道总是至少具有20秒的“数据新鲜度”。有什么方法可以告诉数据流以确定延迟的优先级,并尽快运行所有消息的所有步骤?并并行运行它们?它似乎在批处理中运行管道的每个步骤。就像它将读取8条消息一样,然后将解析所有8条消息,然后将收集所有8张图像,然后复制8个图像,然后调用Webhook 8次,然后重复。

我觉得这些功能是显而易见的,并且必须将数据流作为实时流媒体管道运行,但是我找不到任何内容。我想念什么?

I have set up a custom DataFlow job written in Python, that just copies an image (triggerd by a pubsub message) from one bucket to another and then sends a HTTP request.

It looks something like:

    messages = pipeline | 'On Pubsub Message' >> ReadFromPubSub(topic=config['pubsub_image_topic'])
    new_images = messages | 'Parse Pubsub Message' >> apache_beam.Map(parse_pubsub)
    valid_images = new_images | 'Attach Metadata' >> apache_beam.ParDo(AttachMetadata())
    collected_images = valid_images | 'Collect Image' >> apache_beam.Map(collect_image)
    copied_images = collected_images | 'Copy To Bucket' >> apache_beam.ParDo(CopyToBucketDoFun())
    data_bucket_images | 'Notify WebHook' >> apache_beam.ParDo(NotifyWebHookFn())

I run it in GCP DataFlow with arguments like:

python ./main.py \
  --job_name=$JOB_NAME \
  --streaming \
  --enable_streaming_engine \
  --runner="DataflowRunner" \
  --project=$GCP_PROJECT \
  --region=$REGION \
  --subnetwork=$SUBNETWORK \
  --staging_location=$STAGING_LOCATION \
  --no_use_public_ips \
  --setup_file ./setup.py \
  --service_account_email "${SERVICE_ACCOUNT}" \
  --requirements_file "requirements.txt" \
  --pubsub_image_topic $PUBSUB_IMAGE_TOPIC \
  --update

This finds the old job with that name, and stops it, copies the state, and then starts this job and restores the state. However, this causes 9 minutes of downtime while the old job is stopped and the old one is getting initialized. Is there any way to keep the old job running until the new one is ready to process?

Also, I would like this pipeline to run with minimal latency. However, this pipeline always has a minimum of 20 seconds "data freshness", even though all of the steps for a single message should take less than half a second. Is there any way to tell DataFlow to prioritize latency, and to run all of the steps for a message as soon as possible? And to run them in parallel? It seems to run each step of the pipeline in batch. Like it will read 8 messages, then it will parse all 8 messages, then it will collect all 8 images, and then copy 8 images, and then call the WebHook 8 times, and then repeat.

I feel like these features are obvious and necessary to have DataFlow run as a realtime streaming pipeline, but I can't find anything about it. What am I missing?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

梦魇绽荼蘼 2025-01-28 14:05:34

解决此问题的方法有几种方法

There is several ways to solve this

  • You can accept the downtime. According to your SLA, you should be able to stop your service for a while, consume your error budget, and start your new pipeline. It's the normal life of any application. 100% uptime is never the target!
  • You can run several pipeline in parallel. But your process must be idempotent. If not, that's not a suitable solution
  • You can update you current streaming pipeline. There is some limitation, but if it's fine, it's your best option!
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文