Airflow 任务卡在排队状态

发布于 2025-01-13 08:14:28 字数 1337 浏览 2 评论 0原文

Airflow 正在运行,但任务因状态排队而卡住。

我运行了气流调度程序。

这是我的代码和气流用户界面的快照。

任何人都可以向我解释一下问题是什么?

import datetime as dt
from datetime import timedelta

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import pandas as pd

def CSVToJson():
    df = pd.read_csv('/Users/daeyong/Desktop/Projects/Python/airflow2/file.csv')
    for i,r in df.interrows() :
        print(r['name'])
    df.to_json('fromAirflow.json', orient='record')

default_args = {
    'owner': 'paulcrickard',
    'start_date': dt.datetime(2022, 3, 10),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5)
}

with DAG('MyCSVDAG',
    default_args=default_args,
    schedule_interval=timedelta(minutes=5),
    # '0 * * * *',
) as dag:
    print_starting = BashOperator(task_id='starting',
    bash_command='echo "I am reading the CSV now....."')
    CSVJson = PythonOperator(task_id='convertCSVtoJson', python_callable=CSVToJson)


print_starting >>  CSVJson

airflow_screenshot_1

airflow_screenshot_2

Airflow is running but the task is stuck as its status queued.

I ran airflow scheduler.

Here are my code and snapshot of the airflow ui.

Can any one explain to me what the problem would be?

import datetime as dt
from datetime import timedelta

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import pandas as pd

def CSVToJson():
    df = pd.read_csv('/Users/daeyong/Desktop/Projects/Python/airflow2/file.csv')
    for i,r in df.interrows() :
        print(r['name'])
    df.to_json('fromAirflow.json', orient='record')

default_args = {
    'owner': 'paulcrickard',
    'start_date': dt.datetime(2022, 3, 10),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5)
}

with DAG('MyCSVDAG',
    default_args=default_args,
    schedule_interval=timedelta(minutes=5),
    # '0 * * * *',
) as dag:
    print_starting = BashOperator(task_id='starting',
    bash_command='echo "I am reading the CSV now....."')
    CSVJson = PythonOperator(task_id='convertCSVtoJson', python_callable=CSVToJson)


print_starting >>  CSVJson

airflow_screenshot_1

airflow_screenshot_2

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

浪漫人生路 2025-01-20 08:14:28

两个可能的原因,没有更多背景。

  1. 您的默认池没有分配或可用的任何槽位
  2. 您的任务声明需要通过选项卡切换到“with DAG”语句中

调度程序日志和您的池页面的图像将提供更多帮助。

Two possible reasons without more context.

  1. Your default pool does not have any slots assigned or available
  2. Your declaration of tasks needs to be tabbed over to fall within the "with DAG" statement

Scheduler logs and an image of your pools page would help more.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文