如何将Kafka IO从Apache Beam连接到Confluent Cloud中的群集
我已经在Python读了一个简单的管道,可以从Kafka阅读,这是Kafka群集在Confluent Cloud上,并且在连接它时遇到了一些麻烦。 我会在数据流工作中获取以…
在Google DataFlow管道中构建容器
tl; dr Apache Beam管道步骤涉及构建Docker图像;如何使用Google DataFlow运行此管道?存在哪些选择? 目前,我正在尝试使用Google的数据流服务和Apa…
数据流管道不以 -v 参数启动
我正在编写一个 apache beam 管道,用于在 python 中处理大数据文件。此管道的参数由 argparse 和 PipelineOptions 管理。我在 _add_argparse_args() …
无法从Google DataFlow连接到SSL启用的弹性搜索
无法使用Google Cloud DataFlow连接到SSL启用弹性搜索。我已经使用了Google提供的模板,来自 git 我修改了源代码以通过keystore,keystorepassword,k…
GCS Bucket 文件路径作为 Cloud Function - GCP 触发的数据流管道的输入
我正在尝试使用云函数(创建/最终确定)触发器,以使GCS存储键启动数据流管线。我正在尝试弄清楚如何在触发时将GCS存储桶中的CSV文件路径提供给自定义…
列表索引必须是整数或切片,而不是数据流管道中的 str 错误
我正在运行以下管道以将数据从GCS中摄取到BQ。 import argparse import logging import re import json import apache_beam as beam from apache_beam…
Apache Beam Initializer
在我的DataFlow作业中,我需要在实际处理开始之前初始化配置工厂并在审核日志中记录某些消息。 我已经将配置工厂初始化代码 +审核记录放在父级 platfo…
在流管道数据流之后移动文件
我创建了一个流式Apache Beam管道,该流程管线从GCS文件夹中读取文件并将其插入Boogtable,它可以很好地工作,但我想将proccessed文件移动在新的GCS文…
通过 GCP 云调度程序传递动态值
我需要使用云调度程序的随机/动态job_names触发Google DataFlow作业。我能够传递静态作业名称并通过在调度程序主体中传递数据流。有没有办法通过调度…
GCP Dataflow 发布/订阅到发布/订阅
我正在使用 gcp pub/sub。如果接受新消息时出现异常,我会将其放入死信队列。 我希望有机会在审核后重新处理邮件。我尝试使用数据流,但它只是在消息…
Beam Dataflow python 作业在 dist_proc/dax/shuffle/batch/chunking_shuffle_writer.cc#146 中失败
我的数据流作业反复失败并显示以下日志消息: 022-03-27T01:39:21.871476411Z尝试执行工作项 1257504293434498471 时引发异常:回溯(最近一次调用)…
elastic_enterprise_search.AppSearch 客户端在 GCloud Dataflow 上的 python sdk 中失败,并出现 urllib3 证书错误
我正在开发一个写入 Elastic Search App Search (elastic_enterprise_search.AppSearch) 的 DoFn。当我使用 DirectRunner 运行管道时,它工作得很好。…
Java Apache Beam 中的 PcollectionView 阶段数据不流动
从 Bigquery 读取数据后,我必须将数据作为侧输入发送到下一级。所以下面是我遵循的步骤 - Read BQ Convert PCollection to PCollection> 将 Pcollect…
云调度程序 - Terrafrom - 如何在 CloudSheduler 主体中传递当前日期
我使用 cloudScheduler 来安排数据流作业。我需要使用 cloudsheduler 主体将运行日期/运行时/当前日期动态传递给数据流作业。 正在使用数据流作业代码…