使用Partial()和Expand()中的气流2.3.1中的Pythonoperator

发布于 2025-02-02 11:02:28 字数 2580 浏览 3 评论 0原文

我有兴趣创建动态过程,因此我看到了partial() and expleving()方法 airflow

我已经实施了以下代码:

from airflow.operators.python_operator import PythonOperator
from airflow.decorators import task
from airflow import DAG
from datetime import datetime as dt
import pendulum

local_tz = pendulum.timezone("America/Mexico_City")

@task
def add_one(x):
    print(x)

default_args = {
    'owner': 'Me',
    'depends_on_past': False,
    'start_date': dt(2020, 7, 17, tzinfo = local_tz),
}

with DAG('dag_test_j',
         default_args = default_args,
         schedule_interval = '0 07 10,25 * *',
         catchup = False,
         concurrency = 4,  
         max_active_runs = 1
         ) as dag:

    python_test = PythonOperator.partial(
        task_id="python_test_task",
        python_callable=add_one,
    ).expand(op_kwargs={'x':[1,2,3]})

python_test

问题是ARGS。我遇到以下错误:

[2022-05-27, 14:08:37 CDT] {taskinstance.py:1570} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=Me
AIRFLOW_CTX_DAG_ID=dag_test_j
AIRFLOW_CTX_TASK_ID=python_test_task
AIRFLOW_CTX_EXECUTION_DATE=2022-05-28T00:08:36.518574+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-05-28T00:08:36.518574+00:00
[2022-05-27, 14:08:37 CDT] {taskinstance.py:1890} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/root/miniconda3/lib/python3.9/site-packages/airflow/operators/python.py", line 168, in execute
    context_merge(context, self.op_kwargs, templates_dict=self.templates_dict)
  File "/root/miniconda3/lib/python3.9/site-packages/airflow/utils/context.py", line 256, in context_merge
    context.update(*args, **kwargs)
  File "/root/miniconda3/lib/python3.9/_collections_abc.py", line 946, in update
    for key, value in other:
ValueError: not enough values to unpack (expected 2, got 1)

只有一个实例运行,我已经尝试将另一个值放在.expand(op_kwargs = {'x':[1,2,3]})方法中,例如:

expand(op_kwargs={'x':[1,2,3],'y':[1,2]})

此ran 2实例而不是1。 还尝试了:

.expand(1,op_kwargs={'x':[1,2,3]}) 

这给了我主屏幕上的错误:

Broken DAG: [/root/airflow/dags/dag_test_i copy.py] Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "/root/airflow/dags/dag_test_i copy.py", line 28, in <module>
    python_test = PythonOperator.partial(
TypeError: expand() takes 1 positional argument but 2 were given

如何解决此错误?

I'm interested in creating dynamic processes, so I saw the partial() and expand() methods in the 2.3 version of airflow.

I have implemented the following code:

from airflow.operators.python_operator import PythonOperator
from airflow.decorators import task
from airflow import DAG
from datetime import datetime as dt
import pendulum

local_tz = pendulum.timezone("America/Mexico_City")

@task
def add_one(x):
    print(x)

default_args = {
    'owner': 'Me',
    'depends_on_past': False,
    'start_date': dt(2020, 7, 17, tzinfo = local_tz),
}

with DAG('dag_test_j',
         default_args = default_args,
         schedule_interval = '0 07 10,25 * *',
         catchup = False,
         concurrency = 4,  
         max_active_runs = 1
         ) as dag:

    python_test = PythonOperator.partial(
        task_id="python_test_task",
        python_callable=add_one,
    ).expand(op_kwargs={'x':[1,2,3]})

python_test

The problem are the args. I'm getting the following error:

[2022-05-27, 14:08:37 CDT] {taskinstance.py:1570} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=Me
AIRFLOW_CTX_DAG_ID=dag_test_j
AIRFLOW_CTX_TASK_ID=python_test_task
AIRFLOW_CTX_EXECUTION_DATE=2022-05-28T00:08:36.518574+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-05-28T00:08:36.518574+00:00
[2022-05-27, 14:08:37 CDT] {taskinstance.py:1890} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/root/miniconda3/lib/python3.9/site-packages/airflow/operators/python.py", line 168, in execute
    context_merge(context, self.op_kwargs, templates_dict=self.templates_dict)
  File "/root/miniconda3/lib/python3.9/site-packages/airflow/utils/context.py", line 256, in context_merge
    context.update(*args, **kwargs)
  File "/root/miniconda3/lib/python3.9/_collections_abc.py", line 946, in update
    for key, value in other:
ValueError: not enough values to unpack (expected 2, got 1)

Just one instance ran and i already tried putting another values in the .expand(op_kwargs={'x':[1,2,3]}) method, like:

expand(op_kwargs={'x':[1,2,3],'y':[1,2]})

This ran 2 instances instead of 1.
Also tried:

.expand(1,op_kwargs={'x':[1,2,3]}) 

This gives me an error in the main screen:

Broken DAG: [/root/airflow/dags/dag_test_i copy.py] Traceback (most recent call last):
  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed
  File "/root/airflow/dags/dag_test_i copy.py", line 28, in <module>
    python_test = PythonOperator.partial(
TypeError: expand() takes 1 positional argument but 2 were given

How do I resolve this error?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

醉南桥 2025-02-09 11:02:28

右语法是每个映射任务的op_kwargs的列表:explevation(op_kwargs = [{'x':1},{'x':2},{'x':x':3} ])explient_kwargs([{'x':1},{'x':2},{'x':3}]))而不是evage> explevent(op_kwargs) = {'x':[1,2,3]}),您可以为每个任务提供不同的op_kwargs,例如:explient_kwargs([[{'x'':'x':': 1,'y':1},{'x':2},{'x':3,'z':3}])

并在<之前删除decorator task代码> add_one 在pythonoperator中运行它

from airflow.operators.python_operator import PythonOperator
from airflow.decorators import task
from airflow import DAG
from datetime import datetime as dt
import pendulum

local_tz = pendulum.timezone("America/Mexico_City")

def add_one(x):
    print(x)

default_args = {
    'owner': 'Me',
    'depends_on_past': False,
    'start_date': dt(2020, 7, 17, tzinfo = local_tz),
}

with DAG('dag_test_j',
         default_args = default_args,
         schedule_interval = '0 07 10,25 * *',
         catchup = False,
         concurrency = 4,  
         max_active_runs = 1
         ) as dag:

    python_test = PythonOperator.partial(
        task_id="python_test_task",
        python_callable=add_one,
    ).expand_kwargs([{'x':1}, {'x':2}, {'x':3}])

python_test

The right syntax is list of op_kwargs for each mapped task: expand(op_kwargs=[{'x':1}, {'x':2}, {'x':3}]) or expand_kwargs([{'x':1}, {'x':2}, {'x':3}]) instead of expand(op_kwargs={'x':[1,2,3]}), and you can provide different op_kwargs for each task, ex: expand_kwargs([{'x':1, 'y': 1}, {'x':2}, {'x':3, 'z': 3}])

And remove the decorator task before add_one where you run it in a PythonOperator

from airflow.operators.python_operator import PythonOperator
from airflow.decorators import task
from airflow import DAG
from datetime import datetime as dt
import pendulum

local_tz = pendulum.timezone("America/Mexico_City")

def add_one(x):
    print(x)

default_args = {
    'owner': 'Me',
    'depends_on_past': False,
    'start_date': dt(2020, 7, 17, tzinfo = local_tz),
}

with DAG('dag_test_j',
         default_args = default_args,
         schedule_interval = '0 07 10,25 * *',
         catchup = False,
         concurrency = 4,  
         max_active_runs = 1
         ) as dag:

    python_test = PythonOperator.partial(
        task_id="python_test_task",
        python_callable=add_one,
    ).expand_kwargs([{'x':1}, {'x':2}, {'x':3}])

python_test
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文