气流:Job< Job-ID>在结束之前被杀死(可能是由于记忆不足)
我有一个有两个任务的线性dag-第一个任务的 truthy/falsy 值决定是否执行第二个任务。我正在使用ShortCircuitoter进行第一个任务,以便如果需要,可以绕过第二个任务。以下是我的DAG代码: -
DAG_VERSION = "1.0.0"
with DAG(
"sample_dag",
catchup=False,
tags=[DAG_VERSION],
max_active_runs=1,
schedule_interval=None,
default_args=DEFAULT_ARGS,
) as dag:
dag.doc_md = "Sample DAG"
TASK_1 = ShortCircuitOperator(
task_id="task_1",
python_callable=test_script_1,
executor_config=EXECUTOR_CONFIG,
)
TASK_2 = PythonOperator(
task_id="task_2",
python_callable=test_script_2,
executor_config=EXECUTOR_CONFIG,
)
TASK_1 >> TASK_2
但是,当我尝试运行DAG时,当它返回真相值时,我会在第一个任务中关注: -
task_1 logs
Marking task as SUCCESS. dag_id=sample_dag, task_id=task_1, execution_date=20220606T060000, start_date=20220606T070012, end_date=20220606T070014
[2022-06-06, 07:00:17 UTC] State of this instance has been externally set to success. Terminating instance.
[2022-06-06, 07:00:17 UTC] Sending Signals.SIGTERM to GPID 18
[2022-06-06, 07:01:17 UTC] process psutil.Process(pid=18, name='airflow task runner: sample_dag task_1 scheduled__2022-06-06T06:00:00+00:00 7037', status='sleeping', started='07:00:12') did not respond to SIGTERM. Trying SIGKILL
[2022-06-06, 07:01:17 UTC] Process psutil.Process(pid=18, name='airflow task runner: sample_dag task_1 scheduled__2022-06-06T06:00:00+00:00 7037', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='07:00:12') (18) terminated with exit code Negsignal.SIGKILL
[2022-06-06, 07:01:17 UTC] Job 7037 was killed before it finished (likely due to running out of memory)
我正在使用第一个任务的返回值在第二个任务中。当我尝试记录第二个任务中的第一个任务的XCOM值时,我会得到none
,这会导致第二个任务失败。这是我的访问XCOM值的代码,用于第二个任务中的第一个任务: -
def test_script_2(**context: models.xcom) -> List[str]:
task_instance = context["task_instance"]
return_value = task_instance.xcom_pull(task_ids="task_1")
print("logging return value of first task ", return_value)
我正在使用kubernetes executor运行气流2.2.2。
无
XCOM值是由于第一个任务中的内存问题而引起的?我尝试通过在第一个任务中添加固定值,但是在第二个任务中返回了,并以下log返回: -
task_2 logs
Marking task as SUCCESS. dag_id=sample_dag, task_id=task_2, execution_date=20220606T095611, start_date=20220606T095637, end_date=20220606T095638
[2022-06-06, 09:56:42 UTC] State of this instance has been externally set to success. Terminating instance.
[2022-06-06, 09:56:42 UTC] Sending Signals.SIGTERM to GPID 18
[2022-06-06, 09:57:42 UTC] process psutil.Process(pid=18, name='airflow task runner: sample_dag task_2 manual__2022-06-06T09:56:11.804005+00:00 7051', status='sleeping', started='09:56:37') did not respond to SIGTERM. Trying SIGKILL
[2022-06-06, 09:57:42 UTC] Process psutil.Process(pid=18, name='airflow task runner: sample_dag task_2 manual__2022-06-06T09:56:11.804005+00:00 7051', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='09:56:37') (18) terminated with exit code Negsignal.SIGKILL
[2022-06-06, 09:57:42 UTC] Job 7051 was killed before it finished (likely due to running out of memory)
我无法在代码中找到问题。会感谢我要在哪里出错的任何提示以及如何获得XCOM的第一个任务价值,
谢谢
I have a linear DAG with two tasks - first task's truthy/falsy value decide if second task would be executed. I am using ShortCircuitOperator for first task so that if needed second task can be bypassed. Following is my DAG code :-
DAG_VERSION = "1.0.0"
with DAG(
"sample_dag",
catchup=False,
tags=[DAG_VERSION],
max_active_runs=1,
schedule_interval=None,
default_args=DEFAULT_ARGS,
) as dag:
dag.doc_md = "Sample DAG"
TASK_1 = ShortCircuitOperator(
task_id="task_1",
python_callable=test_script_1,
executor_config=EXECUTOR_CONFIG,
)
TASK_2 = PythonOperator(
task_id="task_2",
python_callable=test_script_2,
executor_config=EXECUTOR_CONFIG,
)
TASK_1 >> TASK_2
However, when I try to run the DAG, then I get following in log for first task when it returns truthy value :-
task_1 logs
Marking task as SUCCESS. dag_id=sample_dag, task_id=task_1, execution_date=20220606T060000, start_date=20220606T070012, end_date=20220606T070014
[2022-06-06, 07:00:17 UTC] State of this instance has been externally set to success. Terminating instance.
[2022-06-06, 07:00:17 UTC] Sending Signals.SIGTERM to GPID 18
[2022-06-06, 07:01:17 UTC] process psutil.Process(pid=18, name='airflow task runner: sample_dag task_1 scheduled__2022-06-06T06:00:00+00:00 7037', status='sleeping', started='07:00:12') did not respond to SIGTERM. Trying SIGKILL
[2022-06-06, 07:01:17 UTC] Process psutil.Process(pid=18, name='airflow task runner: sample_dag task_1 scheduled__2022-06-06T06:00:00+00:00 7037', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='07:00:12') (18) terminated with exit code Negsignal.SIGKILL
[2022-06-06, 07:01:17 UTC] Job 7037 was killed before it finished (likely due to running out of memory)
I am using return value of first task in second task. When I try to log xcom value of first task inside second task then I get None
, which cause second task to fail. This is my code for access xcom value for first task inside second task :-
def test_script_2(**context: models.xcom) -> List[str]:
task_instance = context["task_instance"]
return_value = task_instance.xcom_pull(task_ids="task_1")
print("logging return value of first task ", return_value)
I am running Airflow 2.2.2 with kubernetes executor.
Is None
xcom value due to out of memory issue in first task ? I tried by adding fixed value in first task, but again None
was returned in second task with following log :-
task_2 logs
Marking task as SUCCESS. dag_id=sample_dag, task_id=task_2, execution_date=20220606T095611, start_date=20220606T095637, end_date=20220606T095638
[2022-06-06, 09:56:42 UTC] State of this instance has been externally set to success. Terminating instance.
[2022-06-06, 09:56:42 UTC] Sending Signals.SIGTERM to GPID 18
[2022-06-06, 09:57:42 UTC] process psutil.Process(pid=18, name='airflow task runner: sample_dag task_2 manual__2022-06-06T09:56:11.804005+00:00 7051', status='sleeping', started='09:56:37') did not respond to SIGTERM. Trying SIGKILL
[2022-06-06, 09:57:42 UTC] Process psutil.Process(pid=18, name='airflow task runner: sample_dag task_2 manual__2022-06-06T09:56:11.804005+00:00 7051', status='terminated', exitcode=<Negsignal.SIGKILL: -9>, started='09:56:37') (18) terminated with exit code Negsignal.SIGKILL
[2022-06-06, 09:57:42 UTC] Job 7051 was killed before it finished (likely due to running out of memory)
I am unable to find issue in the code. Would appreciate any hint on where I am going wrong and how to get xcom value of first task
Thanks
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论