Apache Beam DataFlow Runner 在启动期间写入数据存储/强制限制时抛出错误
我最近将 GCP 数据流上的管道从版本 2.27 更新到版本 2.34 使用 WriteToDataStore
连接器的管道由于以下错误而失败:
Error message from worker: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1369, in apache_beam.runners.common._OutputProcessor.process_outputs
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/rampup_throttling_fn.py", line 83, in process max_ops_budget = self._calc_max_ops_budget(self._first_instant, instant)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/rampup_throttling_fn.py", line 74, in _calc_max_ops_budget max_ops_budget = int(self._BASE_BUDGET / self._num_workers * (1.5**growth)) OverflowError: (34, 'Numerical result out of range')
During handling of the above exception, another exception occurred: Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 213, in execute op.start()
File "dataflow_worker/shuffle_operations.py", line 63, in "dataflow_worker/shuffle_operations.py", line 261, in
...contd error message "apache_beam/runners/worker/operations.py", line 714, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1235, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1316, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1369, in apache_beam.runners.common._OutputProcessor.process_outputs
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/rampup_throttling_fn.py", line 83, in process max_ops_budget = self._calc_max_ops_budget(self._first_instant, instant)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/rampup_throttling_fn.py", line 74, in _calc_max_ops_budget max_ops_budget = int(self._BASE_BUDGET / self._num_workers * (1.5**growth))
RuntimeError: OverflowError: (34, 'Numerical result out of range') [while running 'Write to Data-store/Enforce throttling during ramp-up']
到目前为止,作业工作正常。
我检查了版本 2.32 中添加的 apache-beam python sdk 更新,以添加到 DatastoreIO 连接器的启动 [BEAM-12272] Python - Backport FirestoreIO 连接器升级到 DatastoreIO 连接器 - ASF JIRA
这为连接器引入了两个新参数 throttle_rampup
和 hint_num_workers
如 apache_beam.io.gcp.datastore.v1new.datastoreio 模块 — Apache Beam 文档
我没有对参数值进行任何更改。 我需要帮助来理解参数的含义,特别是 hint_num_workers
以及为什么默认值会失败。
但是,设置 throttle_rampup=False
作业运行正常。 如果我想采用最佳实践并使用 throttle_rampup=True
,如何使作业成功运行。
提前致谢。
I recently updated my pipelines on GCP dataflow from version 2.27 to version 2.34
Pipelines using WriteToDataStore
connector failed due to following error:
Error message from worker: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1369, in apache_beam.runners.common._OutputProcessor.process_outputs
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/rampup_throttling_fn.py", line 83, in process max_ops_budget = self._calc_max_ops_budget(self._first_instant, instant)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/rampup_throttling_fn.py", line 74, in _calc_max_ops_budget max_ops_budget = int(self._BASE_BUDGET / self._num_workers * (1.5**growth)) OverflowError: (34, 'Numerical result out of range')
During handling of the above exception, another exception occurred: Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 213, in execute op.start()
File "dataflow_worker/shuffle_operations.py", line 63, in "dataflow_worker/shuffle_operations.py", line 261, in
...contd error message "apache_beam/runners/worker/operations.py", line 714, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1235, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1316, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 571, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1369, in apache_beam.runners.common._OutputProcessor.process_outputs
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/rampup_throttling_fn.py", line 83, in process max_ops_budget = self._calc_max_ops_budget(self._first_instant, instant)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/gcp/datastore/v1new/rampup_throttling_fn.py", line 74, in _calc_max_ops_budget max_ops_budget = int(self._BASE_BUDGET / self._num_workers * (1.5**growth))
RuntimeError: OverflowError: (34, 'Numerical result out of range') [while running 'Write to Data-store/Enforce throttling during ramp-up']
The jobs worked fine until now.
I checked the apache-beam python sdk updates added in release 2.32 for adding ramp-up to DatastoreIO connector [BEAM-12272] Python - Backport FirestoreIO connector's ramp-up to DatastoreIO connector - ASF JIRA
This introduces two new parameters for the connectorthrottle_rampup
and hint_num_workers
as described in apache_beam.io.gcp.datastore.v1new.datastoreio module — Apache Beam documentation
I have not made any changes to the parameter values.
I need help in understanding what the parameters mean, particularly hint_num_workers
and why is it failing with default values.
However setting the throttle_rampup=False
the job runs fine.
If I want to go with best practices and use throttle_rampup=True
, how to do I make job run successfully.
Thanks in advance.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这是
rampup_throtdling_fn.py
中的一个已知问题,由max_ops_budget
变量的数据类型引起,从而导致溢出。您可以查看beam GitHub 上的问题报告。修复已合并到 master 中。因此,更新新版本应该可以解决问题(或者在版本不可用时降级)。关于参数的含义,除了文档描述之外,没有太多内容:
即,您的工作人员数量可能会因负载而增加,这可能是逐渐的或突然的。
即预期的最终工作人员数量,然后该函数可以计算在一个时间段内可以创建多少工作人员以对斜坡上升产生逐渐或突然的影响。
this is a know issue in the
rampup_throttling_fn.py
caused by the data type of themax_ops_budget
variable, causing an overflow. You can see the issue report on beam GitHub. A fix was already merged to master. So, updating for a newer version should solve the issue (or downgrade for a version when its not available).About the meaning of the parameters, there ins't much beyond the documentation description:
i.e, you have a number of workers that may increase due to the load, and this can be gradual or abrupt.
i.e the expected final number of workers, then the function can calculate how much workers can be created in a time slot to cause an gradual of abrupt effect on the ramp-up.