apache beam / dataflow gosdk管道不会处理任何pubsub消息
我希望每个人的星期五进展顺利。我拼命寻求Apache Beam Go SDK的帮助( https://github.com /apache/beam/tree/master/sdks )。我已经写了一条使用…
如何将objectId从bson封装导入到Apache Beam管道中(与Pymongo兼容)?
嗨,我正在使用Google DataFlow中的管道,该管道加载了一些MongoDB文档(带有Pymongo)并进行处理。在某个时候,我需要将ID投入到mongoDB objectid。 …
Apache Beam Fileio写下压缩文件
我想知道是否可以使用Apache Beam Python SDK的Fileio模块编写压缩文件。目前,我使用模块将文件写入GCP存储桶: _ = (logs | 'Window' >> beam.Windo…
Apache Beam Python SDK用于SQL窗口
问题是,我想在 SELECT f_timestamp, line, COUNT(*) FROM PCOLLECTION GROUP BY line, HOP(f_timestamp, INTERVAL '30' MINUTE, INTERVAL '1' HOUR) …
Apache Beam管道写入多个水槽
我有一个需要进行以下操作的方案: 从GCS中存储的文件中读取数据, 将多个转换应用于数据。 坚持Google大查询中的PCollection。 步骤3。成功时,将PCo…
Apache Beam WriteTokafka(Python SDK)dom do to to to to to to topector(没有错误的清单)
我正在尝试使用Writetokafka类Apache Beam(Python SDK)写入Kafka主题的流。但是,它无休止地运行脚本(没有错误),并且不会将流写入主题。我必须取…
TFX导致TypeCheckerror:输出类型提示违反[train]:Expector< class< apache_beam.pvalue.pvalue.pdone.pdone''
我尝试运行TFX文档中提供的教程,并且在通往元数据的路径方面面临问题。 我已经在本期中详细描述了查询 https://github.com/github.com/tensorflow/te…
Google DataFlow连续8个期间与JVM失败,用于全球合并
在我的管道中,我有大约400万个记录,流量如下 阅读BigQuery 的所有记录 阅读BigQuery 转换为Proto 全球组合并创建基于KV的SST文件,后来用于RockSDB …
Python Apache Beam WriteTotext设置最大大小的部分文件
我正在使用WritEtotext进行Google Cloud Storage。文件以这种格式生成,例如:1656288022-00009 of-00017.json.gz,将有17个部分文件。 我写的代码要…
如何在Apache Beam中适当地为API呼叫创建会话
我正在为我的PCollection中的每个元素进行基本的API调用。在最初的实施中,我无需使用会话就打了电话,我的工作将大约9分钟30秒的时间用于1200行。总…
使用Apache Beam Google DataFlow和Java将带有未知JSON属性的大型JSONL文件转换为CSV
如何使用Apache Beam,Google DataFlow和Java将带有未知JSON属性的大型JSONL文件转换为CSV, 这是我的场景: Google Storage JSON属性中的一个大JSONL…
Apache Beam Pipeline:OutOfMemoryException在用Google DataFlow编写AVRO文件为Google Cloud存储时
我们在Apache Beam Java SDK 2.34.0中开发了一个批处理管道,并使用Google Cloud DataFlow Runner运行。我们有一个步骤来编写AVRO文件。 Avro Write引…
TensorFlow在AWS上扩展分布式培训
我是TensorFlow扩展的新手,我知道TFX使用Apache Beam与Spark,DataFlow等跑步者连接到GCP。我的问题是,我可以使用TFX在AWS上分发培训吗? TFX使用的…
使用DataFlow的pubsub到云存储很慢
我正在使用以下示例 snippet向GCS写下PubSub消息: class WriteToGCS(beam.DoFn): def __init__(self, output_path, prefix): self.output_path = out…