气流2中的S3Keysensor

发布于 2025-01-25 21:52:17 字数 2200 浏览 5 评论 0 原文

我有一个称为 my_dag.py 的DAG,该利用气流2中的S3KeySensor检查S3密钥是否存在。当我直接在DAG内使用传感器时,它可以工作:

with TaskGroup('check_exists') as check_exists: 
    
  path = 's3://my-bucket/data/my_file'
  poke_interval = 30
  timeout = 60*60
  mode = 'reschedule'
  dependency_name = 'my_file'

  S3KeySensor(
    task_id = 'check_' + dependency_name + '_exists',
    bucket_key = path,
    poke_interval = poke_interval,
    timeout = timeout,
    mode = mode
  )

上述日志看起来像:

[2022-05-03, 19:51:26 UTC] {s3.py:105} INFO - Poking for key : s3://my-bucket/data/my_file
[2022-05-03, 19:51:26 UTC] {base_aws.py:90} INFO - Retrieving region_name from Connection.extra_config['region_name']
[2022-05-03, 19:51:27 UTC] {taskinstance.py:1701} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE

这是正确的。预计重新安排了,因为该文件尚不存在。

但是,我想检查其他DAG中的任意数量的路径,因此我将传感器移动到一个名为 test 的函数中,该文件在另一个称为 helpers.py.py 的文件中。我在 my_dag.py 中使用python运算符,在调用 test 的任务组中使用。它看起来像这样:

with TaskGroup('check_exists') as check_exists:

  path = 's3://my-bucket/data/my_file'
  dependency_name = 'my_file'

  wait_for_dependencies = PythonOperator(
    task_id = 'wait_for_my_file',
    python_callable = test,
    op_kwargs = {
      'dependency_name': dependency_name,
      'path': path
    },
    dag = dag
  )

  wait_for_dependencies

test in helpers.py 看起来像:

def test(dependency_name, path, poke_interval = 30, timeout = 60 * 60, mode = 'reschedule'):

    S3KeySensor(
        task_id = 'check_' + dependency_name + '_exists',
        bucket_key = path,
        poke_interval = poke_interval,
        timeout = timeout,
        mode = mode
    )

但是,当我运行DAG时,即使文件不存在,该步骤也被标记为成功。日志显示:

[2022-05-03, 20:07:54 UTC] {python.py:175} INFO - Done. Returned value was: None
[2022-05-03, 20:07:54 UTC] {taskinstance.py:1282} INFO - Marking task as SUCCESS.

似乎气流不喜欢通过Python操作员使用传感器。这是真的吗?还是我做错了什么?

我的目标是循环循环多个路径,并检查每个路径是否存在。但是,我在其他DAG中这样做,这就是为什么我将传感器放在另一个文件中的函数中。

如果有其他想法可以做到这一点,我很开!

感谢您的帮助!

I have a dag called my_dag.py that utilizes the S3KeySensor in Airflow 2 to check if a s3 key exists. When I use the sensor directly inside the dag, it works:

with TaskGroup('check_exists') as check_exists: 
    
  path = 's3://my-bucket/data/my_file'
  poke_interval = 30
  timeout = 60*60
  mode = 'reschedule'
  dependency_name = 'my_file'

  S3KeySensor(
    task_id = 'check_' + dependency_name + '_exists',
    bucket_key = path,
    poke_interval = poke_interval,
    timeout = timeout,
    mode = mode
  )

The log of the above looks like:

[2022-05-03, 19:51:26 UTC] {s3.py:105} INFO - Poking for key : s3://my-bucket/data/my_file
[2022-05-03, 19:51:26 UTC] {base_aws.py:90} INFO - Retrieving region_name from Connection.extra_config['region_name']
[2022-05-03, 19:51:27 UTC] {taskinstance.py:1701} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE

This is correct. The reschedule is expected, because the file does not exist yet.

However, I want to check any number of paths in other dags, so I moved the sensor into a function called test in another file called helpers.py. I use a python operator in my_dag.py within the task group that calls test. It looks like this:

with TaskGroup('check_exists') as check_exists:

  path = 's3://my-bucket/data/my_file'
  dependency_name = 'my_file'

  wait_for_dependencies = PythonOperator(
    task_id = 'wait_for_my_file',
    python_callable = test,
    op_kwargs = {
      'dependency_name': dependency_name,
      'path': path
    },
    dag = dag
  )

  wait_for_dependencies

The function test in helpers.py looks like:

def test(dependency_name, path, poke_interval = 30, timeout = 60 * 60, mode = 'reschedule'):

    S3KeySensor(
        task_id = 'check_' + dependency_name + '_exists',
        bucket_key = path,
        poke_interval = poke_interval,
        timeout = timeout,
        mode = mode
    )

However, when I run the dag, the step is marked as success even though the file is not there. The logs show:

[2022-05-03, 20:07:54 UTC] {python.py:175} INFO - Done. Returned value was: None
[2022-05-03, 20:07:54 UTC] {taskinstance.py:1282} INFO - Marking task as SUCCESS.

It seems airflow doesn't like using a sensor via a python operator. Is this true? Or am I doing something wrong?

My goal is to loop through multiple paths and check if each one exists. However, I do this in other dags, which is why I'm putting the sensor in a function that resides in another file.

If there are alternative ideas to doing this, I'm open!

Thanks for your help!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

谁的新欢旧爱 2025-02-01 21:52:17

这将无法正常工作。
您在操作员内创建了一个操作员的案例。请参阅此答案这意味着什么。

在您的情况下,您将 s3keysensor pythonoperator 包装。这意味着,当 PythOnoperator 运行时,它仅执行 s3KeySensor 的INIT函数 - 它不会调用操作员本身的逻辑。
在操作员内使用操作员是一种不好的做法。

当您尝试在操作员内使用传感器时,您的情况更加极端。传感器需要为每个戳周期调用 poke()功能。
简化 - 您无法使用 mode ='reschedule'享受传感器的功能,因为重新安排意味着如果尚未满足条件,则需要释放工人,但是 PythOnoperator 不知道该怎么做。

如何解决您的问题:

选项1:

从您显示的代码中可以简单地做:

with TaskGroup('check_exists') as check_exists:
    
  path = 's3://my-bucket/data/my_file'
  dependency_name = 'my_file'

  S3KeySensor(
    task_id='check_' + dependency_name + '_exists',
    bucket_key=path,
    poke_interval=30,
    timeout=60 * 60,
    mode='reschedule'
  )

我没有看到为什么这不能适合您的原因。

选项2:

如果出于某种原因,选项1对您不利,则创建一个自定义传感器,该传感器也接受 dependency_name path 任何其他操作员。
我没有测试它,但是类似以下的事情应该起作用:

class MyS3KeySensor(S3KeySensor):
    def __init__(
        self,
        *,
        dependency_name:str = None,
        path: str = None,
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.task_id = task_id = 'check_' + dependency_name + '_exists'
        self.bucket_name = path

This will not work as you expect.
You created a case of operator inside operator. See this answer for information about what this means.

In your case you wrapped the S3KeySensor with PythonOperator. This means that when the PythonOperator runs it only execute the init function of S3KeySensor - it doesn't invoke the logic of the operator itself.
Using operator inside operator is a bad practice.

Your case is even more extreme as you are trying to use sensor inside operator. Sensors need to invoke the poke() function for every poking cycle.
To simplify - You can not enjoy the power of Sensor with mode = 'reschedule' when you set them as you did because reschedule means that you want to release the worker if condition is not met yet but PythonOperator doesn't know how to do that.

How to solve your issue:

Option 1:

From the code you showed you can simply do:

with TaskGroup('check_exists') as check_exists:
    
  path = 's3://my-bucket/data/my_file'
  dependency_name = 'my_file'

  S3KeySensor(
    task_id='check_' + dependency_name + '_exists',
    bucket_key=path,
    poke_interval=30,
    timeout=60 * 60,
    mode='reschedule'
  )

I didn't see a reason why this can't work for you.

Option 2:

If for some reason option 1 is not good for you then create a custom sensor that accept also dependency_name, path and use it like any other operator.
I didn't test it but something like the following should work:

class MyS3KeySensor(S3KeySensor):
    def __init__(
        self,
        *,
        dependency_name:str = None,
        path: str = None,
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.task_id = task_id = 'check_' + dependency_name + '_exists'
        self.bucket_name = path
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文