数据流//高系统滞后和缓慢的水印
我正在测试数据管道,该数据管道是流媒体管道之间的混合物,并指向GCS数据文件的消息。该系统的目的是允许酒吧子消息决定重复数据删除,组和文件格式的参数(CSV,TSV,管道划界等)。我的系统成功地用于较小的数据集,但似乎正在与较大的数据集斗争。
之所以如此,之所以如此,是因为大多数数据集都可以分组在一起。例如,销售有可能出售消息的公司的公司,我们希望将这些产品分组在一起。我们利用这些元素的一方输入来进行处理。
数据需要准确,因此不允许将丢弃到后期的数据中添加到触发器。当前所有数据都有预期的PubSub消息的水印。
我在实施中是否缺少一些东西,还是我对性能过高的期望?
:
message = (p
| 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(subscription=self._subscription).with_output_types(bytes)
| 'Convert to JSON' >> beam.Map(lambda message_json: json.loads(message_json))
| 'Add timestamp to message' >> beam.Map(apply_datetime, key='__message_dt')
| 'timestamps' >> beam.ParDo(AddTimestampDoFn())
| 'Window into Fixed Intervals' >> beam.WindowInto(
windowfn=FixedWindows(600) # 10 minutes,
)
unique_record_keys = message | "Unique Keys" >> beam.Map(
lambda m: m.get('unique_record_keys'),
) # Fetch the list of attributes/key that we can use for first round of deduplication
record_grouping_keys = message | "Record Grouping Keys" >> beam.Map(
lambda m: m.get('record_grouping_keys')
) # Fetch list of keys for group elements together as they will be stored as an object with multiple entities in a list.
data = message | 'Read all files' >> ReadAllFromDelimitedSource(file_location_key='file_locations') # Custom PTransform that essentially is beam.io.filebasedsource.FileBasedSource pointing to a GCS file. This file is a denormalized file and we are trying to normalize it
key_data = data | "Keyify on Unique" >> beam.ParDo(
GenerateKey(),
keys_list=beam.pvalue.AsList(unique_record_keys)
) # Generate a key for hashing to remove duplicates rows based on columns of the source files from GCS
grouped_data = key_data | 'Group Per Unique Key' >> beam.GroupByKey() # Hangs up here on large dataset for hours on end with watermark not passing through window and data left unprocessed
悬挂的较大数据集(还请注意,它看起来像该步骤一样发射了几次。问题):
我碰到了内存和CPU,因为它看起来挂在上面,但无济于事。稍后它遇到了同样的问题。
更新:2022-06-05
在进一步检查时,该代码正常运行。在增加内存并在数据集上执行完整检查后,在某个列上丢失了组。校正后,数据在20分钟内按预期进行处理。
I am in the process of testing a data pipeline that is a hybrid between a streaming pipeline with messages that point to GCS data files. The goal of the system is to allow for a pub sub message to dictate the parameters of the deduplicating, groups, and file format (csv, tsv, pipe delimited, etc...). I have the system running successfully for smaller data sets, but seems to be struggling with larger dataset.
The reason why we have such a setup is that a majority of that datasets can be grouped together. Such as a company selling multiple products that potentially can come in messages, we would want to group these products together. We leverage side inputs on these elements to allow for processing.
The data needs to be accurate, so adding discard to late data to a trigger is not allowed. All data currently has the watermark of the PubSub message which is anticipated.
Is there something I am missing in the implementation or is my expectation of performance too high?
For example:
message = (p
| 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(subscription=self._subscription).with_output_types(bytes)
| 'Convert to JSON' >> beam.Map(lambda message_json: json.loads(message_json))
| 'Add timestamp to message' >> beam.Map(apply_datetime, key='__message_dt')
| 'timestamps' >> beam.ParDo(AddTimestampDoFn())
| 'Window into Fixed Intervals' >> beam.WindowInto(
windowfn=FixedWindows(600) # 10 minutes,
)
unique_record_keys = message | "Unique Keys" >> beam.Map(
lambda m: m.get('unique_record_keys'),
) # Fetch the list of attributes/key that we can use for first round of deduplication
record_grouping_keys = message | "Record Grouping Keys" >> beam.Map(
lambda m: m.get('record_grouping_keys')
) # Fetch list of keys for group elements together as they will be stored as an object with multiple entities in a list.
data = message | 'Read all files' >> ReadAllFromDelimitedSource(file_location_key='file_locations') # Custom PTransform that essentially is beam.io.filebasedsource.FileBasedSource pointing to a GCS file. This file is a denormalized file and we are trying to normalize it
key_data = data | "Keyify on Unique" >> beam.ParDo(
GenerateKey(),
keys_list=beam.pvalue.AsList(unique_record_keys)
) # Generate a key for hashing to remove duplicates rows based on columns of the source files from GCS
grouped_data = key_data | 'Group Per Unique Key' >> beam.GroupByKey() # Hangs up here on large dataset for hours on end with watermark not passing through window and data left unprocessed
Smaller dataset that processes successfully:
Larger dataset that hangs (also note that it looks like it fires a handful of times to this step. Original dataset is only 2 million lines from the GCS file, this is definitely adding to the issue):
I have bumped the memory and CPU as it looks like it was hanging on that, but to no avail. It experienced the same issue later on.
Update: 2022-06-05
On further inspection, the code was operating correctly. After increasing memory and performing a full inspections on the datasets, the groups were missing on a certain column. On correction, data processed as expectedly in 20 minutes.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我不确定对应用程序的含义,具体取决于系统或如何工作(也许取决于如何您在GenerateKey dofn中消耗侧面输入unique_record_keys)。从管道的外观来看,您正在一次又一次地将带有人工时间戳的任意酒吧/子消息的窗格应用于所有有限的数据。 key_data保持什么?如果您所有的键都是唯一的,那么GBK意味着什么?
明智的表现,您可以尝试
I'm not sure what it means to an application depending on the system or how it would work (maybe depending on how you consume the side input unique_record_keys in your GenerateKey DoFn). From the look of the pipeline, you are applying panes of arbitrary pub/sub messages with artificial timestamps to all bounded data again and again. What does the key_data hold and what does the GBK mean if all your keys are unique?
Performance wise, you can try Dataflow shuffle if it's not enabled.
请仔细检查分组键,并确保您不要尝试将仅花费时间完成的数据集分组。
在此问题上遇到的问题是,组键集有错误,并且无法正确分组在唯一元素上,因此所有键都被重复重复,因此整个窗口的数据集被认为是独特的集合。这没有在较小的数据集上显示,仅在消息的数据驱动性质上存在。
在校正组键后,请在钥匙集中仔细检查套管,即上的键盘,该系统能够正确分组到较小的数据集中并正确定义唯一的集合。
Please double check the grouping key and ensure that you are not attempting to group on too large of a dataset that will only take time to finish.
The issue experienced on this problem was that the group-by key set had an error and was not properly grouping on the unique elements and therefore all keys were considered repeated therefore the whole window'd dataset was considered a unique set. This did not show on the smaller datasets and only existing on the data-driven nature of the messages.
Upon correction of the group-by keys, double check your casing, at the key set, i.e.
Keyify on Unique
, the system was able to correctly group into smaller datasets and properly define unique sets.