Cloud Composer / Airffor

发布于 2025-02-03 17:02:53 字数 533 浏览 3 评论 0原文

我在气流(Cloud Composer)中执行以下任务,该任务触发了云数据输送管道。

问题是:
当(在DataFusion)中(在DataFusion中)已提供数据群集群并且实际作业已进入运行状态时,气流已经将此任务视为成功。

但是,我只希望在完成后将其视为成功。

from airflow.providers.google.cloud.operators.datafusion import \
    CloudDataFusionStartPipelineOperator

my_task = CloudDataFusionStartPipelineOperator(
    location='europe-west1',
    pipeline_name="my_datafusion_pipeline_name",
    instance_name="my_datafusion_instance_name", 
    task_id="my_task_name",
)

I have the following task in Airflow (Cloud Composer) that triggers a Cloud DataFusion pipeline.

The problem is:
Airflow considers this task already a success when (within DataFusion) the DataProc cluster has been provisioned and the actual job has entered the RUNNING state.

But I only want it to be considered a success when it is COMPLETED.

from airflow.providers.google.cloud.operators.datafusion import \
    CloudDataFusionStartPipelineOperator

my_task = CloudDataFusionStartPipelineOperator(
    location='europe-west1',
    pipeline_name="my_datafusion_pipeline_name",
    instance_name="my_datafusion_instance_name", 
    task_id="my_task_name",
)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

或十年 2025-02-10 17:02:53

我必须查看源代码,但以下状态是默认的success_states:
[pipElinestates.completed] + [pipElinestates.running]

因此您必须将scucces_states仅限制为> [pipeLinestates.completed],通过使用关键字success_states喜欢:

from airflow.providers.google.cloud.operators.datafusion import \
    CloudDataFusionStartPipelineOperator
from airflow.providers.google.cloud.hooks.datafusion import PipelineStates

my_task = CloudDataFusionStartPipelineOperator(
    location='europe-west1',
    pipeline_name="my_datafusion_pipeline_name",
    instance_name="my_datafusion_instance_name", 
    task_id="my_task_name",
    success_states=[PipelineStates.COMPLETED], # overwrite default success_states
    pipeline_timeout=3600, # in seconds, default is currently 300 seconds
)

另请参阅:

​Google/cloud/operators/datafusion.py#l872-l875“ rel =“ nofollow noreferrer”>用于DataFusionStartPipelineOperator的成功状态的气流源代码

I had to look in the source code but the following states are the default success_states:
[PipelineStates.COMPLETED] + [PipelineStates.RUNNING]

So you have to limit the succes_states to only [PipelineStates.COMPLETED], by using keyword success_states like so:

from airflow.providers.google.cloud.operators.datafusion import \
    CloudDataFusionStartPipelineOperator
from airflow.providers.google.cloud.hooks.datafusion import PipelineStates

my_task = CloudDataFusionStartPipelineOperator(
    location='europe-west1',
    pipeline_name="my_datafusion_pipeline_name",
    instance_name="my_datafusion_instance_name", 
    task_id="my_task_name",
    success_states=[PipelineStates.COMPLETED], # overwrite default success_states
    pipeline_timeout=3600, # in seconds, default is currently 300 seconds
)

See also:
Airflow documentation on the DataFusionStartPipelineOperator

Airflow source code used for success states of DataFusionStartPipelineOperator

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文