带配置的气流触发 dag

发布于 2025-01-17 19:10:50 字数 1062 浏览 2 评论 0原文

我尝试使用“trigger w/config”在 dag 中使用配置。

    def execute(**kwargs):
    
    dag_run = kwargs['dag_run']
    start_date = dag_run.conf['start_dt'] if 'start_dt' in dag_run.conf.keys() else kwargs['start_dt']
    end_date = dag_run.conf['end_dt'] if 'end_dt' in dag_run.conf.keys() else kwargs['end_dt']
    
    print(f'start_date = {start_date}, end_date = {end_date}')
dag = DAG(
    "corp_dev_ods_test_dag",
    default_args=default_args,
    description='DAG',
    schedule_interval='10 1 * * *',
    start_date=days_ago(0),
    #params={'dt' : '{{ macros.ds_add(ds, -7) }}'},
    catchup=False,
    tags=['dev']
)
run_submit = PythonVirtualenvOperator(
    task_id='run_submit',
    requirements=dag_requirements,
    python_callable=execute,
    system_site_packages=False,
    dag=dag,
    op_kwargs={'start_dt' : '{{ macros.ds_add(ds, -7) }}', 'end_dt': '{{ macros.ds_add(ds, -7) }}'}
)
run_submit

我收到“KeyError”:kwargs[“dag_run”]。但在 PythonOperator (而不是 PythonVirtualenvOperator)的情况下,它可以工作。

那么,我如何在我的 dag 中使用这些参数呢?

I try to use configs in dag using "trigger w/config".

    def execute(**kwargs):
    
    dag_run = kwargs['dag_run']
    start_date = dag_run.conf['start_dt'] if 'start_dt' in dag_run.conf.keys() else kwargs['start_dt']
    end_date = dag_run.conf['end_dt'] if 'end_dt' in dag_run.conf.keys() else kwargs['end_dt']
    
    print(f'start_date = {start_date}, end_date = {end_date}')
dag = DAG(
    "corp_dev_ods_test_dag",
    default_args=default_args,
    description='DAG',
    schedule_interval='10 1 * * *',
    start_date=days_ago(0),
    #params={'dt' : '{{ macros.ds_add(ds, -7) }}'},
    catchup=False,
    tags=['dev']
)
run_submit = PythonVirtualenvOperator(
    task_id='run_submit',
    requirements=dag_requirements,
    python_callable=execute,
    system_site_packages=False,
    dag=dag,
    op_kwargs={'start_dt' : '{{ macros.ds_add(ds, -7) }}', 'end_dt': '{{ macros.ds_add(ds, -7) }}'}
)
run_submit

I got "KeyError": kwargs["dag_run"]. But in case of PythonOperator (Instead of PythonVirtualenvOperator) it works.

So, how can I use such parameters in my dag?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

眉目亦如画i 2025-01-24 19:10:50

您需要在任务中提供一个空的 params 变量,例如:

from airflow.decorators import dag, task
from datetime import datetime


default_params = {"start_date": "2022-01-01", "end_date": "2022-12-01"}

@dag(
    schedule=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['using_params'],
    params=default_params
)
def mydag():

    @task
    def extract(params={}):
        import helper
        filenames = helper.extract(start=params.get("start_date"))
        return filenames

    extract()

_dag = mydag()

现在在 UI 中,当您使用配置触发 DAG 时,您应该能够查看并更改默认值参数。并且能够在您的 dag 任务中访问它。

You need to provide an empty params variable in your task, for example:

from airflow.decorators import dag, task
from datetime import datetime


default_params = {"start_date": "2022-01-01", "end_date": "2022-12-01"}

@dag(
    schedule=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['using_params'],
    params=default_params
)
def mydag():

    @task
    def extract(params={}):
        import helper
        filenames = helper.extract(start=params.get("start_date"))
        return filenames

    extract()

_dag = mydag()

Now in the UI when you Trigger DAG w/ config you should be able to see and change the default params. And be able to access it in your dag task.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文