气流:Job< Job-ID>在结束之前被杀死(可能是由于记忆不足)

发布于 2025-02-04 17:19:09 字数 3381 浏览 0 评论 0原文

我有一个有两个任务的线性dag-第一个任务的 truthy/falsy 值决定是否执行第二个任务。我正在使用ShortCircuitoter进行第一个任务,以便如果需要,可以绕过第二个任务。以下是我的DAG代码: -

DAG_VERSION = "1.0.0"
with DAG(
    "sample_dag",
    catchup=False,
    tags=[DAG_VERSION],
    max_active_runs=1,
    schedule_interval=None,
    default_args=DEFAULT_ARGS,
) as dag:
    
    dag.doc_md = "Sample DAG"

    TASK_1 = ShortCircuitOperator(
        task_id="task_1",
        python_callable=test_script_1,
        executor_config=EXECUTOR_CONFIG,
    )
    
    TASK_2 = PythonOperator(
        task_id="task_2",
        python_callable=test_script_2,
        executor_config=EXECUTOR_CONFIG,
    )

    TASK_1 >> TASK_2

但是,当我尝试运行DAG时,当它返回真相值时,我会在第一个任务中关注: -

task_1 logs

Marking task as SUCCESS. dag_id=sample_dag, task_id=task_1, execution_date=20220606T060000, start_date=20220606T070012, end_date=20220606T070014
[2022-06-06, 07:00:17 UTC] State of this instance has been externally set to success. Terminating instance.
[2022-06-06, 07:00:17 UTC] Sending Signals.SIGTERM to GPID 18
[2022-06-06, 07:01:17 UTC] process psutil.Process(pid=18, name='airflow task runner: sample_dag task_1 scheduled__2022-06-06T06:00:00+00:00 7037', status='sleeping', started='07:00:12') did not respond to SIGTERM. Trying SIGKILL
[2022-06-06, 07:01:17 UTC] Process psutil.Process(pid=18, name='airflow task runner: sample_dag task_1 scheduled__2022-06-06T06:00:00+00:00 7037', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='07:00:12') (18) terminated with exit code Negsignal.SIGKILL
[2022-06-06, 07:01:17 UTC] Job 7037 was killed before it finished (likely due to running out of memory)

我正在使用第一个任务的返回值在第二个任务中。当我尝试记录第二个任务中的第一个任务的XCOM值时,我会得到none,这会导致第二个任务失败。这是我的访问XCOM值的代码,用于第二个任务中的第一个任务: -

def test_script_2(**context: models.xcom) -> List[str]:
    task_instance = context["task_instance"]
    return_value = task_instance.xcom_pull(task_ids="task_1")
    print("logging return value of first task ", return_value)

我正在使用kubernetes executor运行气流2.2.2。

XCOM值是由于第一个任务中的内存问题而引起的?我尝试通过在第一个任务中添加固定值,但是在第二个任务中返回了,并以下log返回: -

task_2 logs

 Marking task as SUCCESS. dag_id=sample_dag, task_id=task_2, execution_date=20220606T095611, start_date=20220606T095637, end_date=20220606T095638
[2022-06-06, 09:56:42 UTC] State of this instance has been externally set to success. Terminating instance.
[2022-06-06, 09:56:42 UTC] Sending Signals.SIGTERM to GPID 18
[2022-06-06, 09:57:42 UTC] process psutil.Process(pid=18, name='airflow task runner: sample_dag task_2 manual__2022-06-06T09:56:11.804005+00:00 7051', status='sleeping', started='09:56:37') did not respond to SIGTERM. Trying SIGKILL
[2022-06-06, 09:57:42 UTC] Process psutil.Process(pid=18, name='airflow task runner: sample_dag task_2 manual__2022-06-06T09:56:11.804005+00:00 7051', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='09:56:37') (18) terminated with exit code Negsignal.SIGKILL
[2022-06-06, 09:57:42 UTC] Job 7051 was killed before it finished (likely due to running out of memory)

我无法在代码中找到问题。会感谢我要在哪里出错的任何提示以及如何获得XCOM的第一个任务价值,

谢谢

I have a linear DAG with two tasks - first task's truthy/falsy value decide if second task would be executed. I am using ShortCircuitOperator for first task so that if needed second task can be bypassed. Following is my DAG code :-

DAG_VERSION = "1.0.0"
with DAG(
    "sample_dag",
    catchup=False,
    tags=[DAG_VERSION],
    max_active_runs=1,
    schedule_interval=None,
    default_args=DEFAULT_ARGS,
) as dag:
    
    dag.doc_md = "Sample DAG"

    TASK_1 = ShortCircuitOperator(
        task_id="task_1",
        python_callable=test_script_1,
        executor_config=EXECUTOR_CONFIG,
    )
    
    TASK_2 = PythonOperator(
        task_id="task_2",
        python_callable=test_script_2,
        executor_config=EXECUTOR_CONFIG,
    )

    TASK_1 >> TASK_2

However, when I try to run the DAG, then I get following in log for first task when it returns truthy value :-

task_1 logs

Marking task as SUCCESS. dag_id=sample_dag, task_id=task_1, execution_date=20220606T060000, start_date=20220606T070012, end_date=20220606T070014
[2022-06-06, 07:00:17 UTC] State of this instance has been externally set to success. Terminating instance.
[2022-06-06, 07:00:17 UTC] Sending Signals.SIGTERM to GPID 18
[2022-06-06, 07:01:17 UTC] process psutil.Process(pid=18, name='airflow task runner: sample_dag task_1 scheduled__2022-06-06T06:00:00+00:00 7037', status='sleeping', started='07:00:12') did not respond to SIGTERM. Trying SIGKILL
[2022-06-06, 07:01:17 UTC] Process psutil.Process(pid=18, name='airflow task runner: sample_dag task_1 scheduled__2022-06-06T06:00:00+00:00 7037', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='07:00:12') (18) terminated with exit code Negsignal.SIGKILL
[2022-06-06, 07:01:17 UTC] Job 7037 was killed before it finished (likely due to running out of memory)

I am using return value of first task in second task. When I try to log xcom value of first task inside second task then I get None, which cause second task to fail. This is my code for access xcom value for first task inside second task :-

def test_script_2(**context: models.xcom) -> List[str]:
    task_instance = context["task_instance"]
    return_value = task_instance.xcom_pull(task_ids="task_1")
    print("logging return value of first task ", return_value)

I am running Airflow 2.2.2 with kubernetes executor.

Is None xcom value due to out of memory issue in first task ? I tried by adding fixed value in first task, but again None was returned in second task with following log :-

task_2 logs

 Marking task as SUCCESS. dag_id=sample_dag, task_id=task_2, execution_date=20220606T095611, start_date=20220606T095637, end_date=20220606T095638
[2022-06-06, 09:56:42 UTC] State of this instance has been externally set to success. Terminating instance.
[2022-06-06, 09:56:42 UTC] Sending Signals.SIGTERM to GPID 18
[2022-06-06, 09:57:42 UTC] process psutil.Process(pid=18, name='airflow task runner: sample_dag task_2 manual__2022-06-06T09:56:11.804005+00:00 7051', status='sleeping', started='09:56:37') did not respond to SIGTERM. Trying SIGKILL
[2022-06-06, 09:57:42 UTC] Process psutil.Process(pid=18, name='airflow task runner: sample_dag task_2 manual__2022-06-06T09:56:11.804005+00:00 7051', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='09:56:37') (18) terminated with exit code Negsignal.SIGKILL
[2022-06-06, 09:57:42 UTC] Job 7051 was killed before it finished (likely due to running out of memory)

I am unable to find issue in the code. Would appreciate any hint on where I am going wrong and how to get xcom value of first task

Thanks

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文