气流2中的S3Keysensor
我有一个称为 my_dag.py
的DAG,该利用气流2中的S3KeySensor检查S3密钥是否存在。当我直接在DAG内使用传感器时,它可以工作:
with TaskGroup('check_exists') as check_exists:
path = 's3://my-bucket/data/my_file'
poke_interval = 30
timeout = 60*60
mode = 'reschedule'
dependency_name = 'my_file'
S3KeySensor(
task_id = 'check_' + dependency_name + '_exists',
bucket_key = path,
poke_interval = poke_interval,
timeout = timeout,
mode = mode
)
上述日志看起来像:
[2022-05-03, 19:51:26 UTC] {s3.py:105} INFO - Poking for key : s3://my-bucket/data/my_file
[2022-05-03, 19:51:26 UTC] {base_aws.py:90} INFO - Retrieving region_name from Connection.extra_config['region_name']
[2022-05-03, 19:51:27 UTC] {taskinstance.py:1701} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
这是正确的。预计重新安排了,因为该文件尚不存在。
但是,我想检查其他DAG中的任意数量的路径,因此我将传感器移动到一个名为 test
的函数中,该文件在另一个称为 helpers.py.py
的文件中。我在 my_dag.py
中使用python运算符,在调用 test
的任务组中使用。它看起来像这样:
with TaskGroup('check_exists') as check_exists:
path = 's3://my-bucket/data/my_file'
dependency_name = 'my_file'
wait_for_dependencies = PythonOperator(
task_id = 'wait_for_my_file',
python_callable = test,
op_kwargs = {
'dependency_name': dependency_name,
'path': path
},
dag = dag
)
wait_for_dependencies
test
in helpers.py
看起来像:
def test(dependency_name, path, poke_interval = 30, timeout = 60 * 60, mode = 'reschedule'):
S3KeySensor(
task_id = 'check_' + dependency_name + '_exists',
bucket_key = path,
poke_interval = poke_interval,
timeout = timeout,
mode = mode
)
但是,当我运行DAG时,即使文件不存在,该步骤也被标记为成功。日志显示:
[2022-05-03, 20:07:54 UTC] {python.py:175} INFO - Done. Returned value was: None
[2022-05-03, 20:07:54 UTC] {taskinstance.py:1282} INFO - Marking task as SUCCESS.
似乎气流不喜欢通过Python操作员使用传感器。这是真的吗?还是我做错了什么?
我的目标是循环循环多个路径,并检查每个路径是否存在。但是,我在其他DAG中这样做,这就是为什么我将传感器放在另一个文件中的函数中。
如果有其他想法可以做到这一点,我很开!
感谢您的帮助!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这将无法正常工作。
您在操作员内创建了一个操作员的案例。请参阅此答案这意味着什么。
在您的情况下,您将
s3keysensor
用pythonoperator
包装。这意味着,当PythOnoperator
运行时,它仅执行s3KeySensor
的INIT函数 - 它不会调用操作员本身的逻辑。在操作员内使用操作员是一种不好的做法。
当您尝试在操作员内使用传感器时,您的情况更加极端。传感器需要为每个戳周期调用
poke()
功能。简化 - 您无法使用
mode ='reschedule'
享受传感器的功能,因为重新安排意味着如果尚未满足条件,则需要释放工人,但是PythOnoperator
不知道该怎么做。如何解决您的问题:
选项1:
从您显示的代码中可以简单地做:
我没有看到为什么这不能适合您的原因。
选项2:
如果出于某种原因,选项1对您不利,则创建一个自定义传感器,该传感器也接受
dependency_name
,path
任何其他操作员。我没有测试它,但是类似以下的事情应该起作用:
This will not work as you expect.
You created a case of operator inside operator. See this answer for information about what this means.
In your case you wrapped the
S3KeySensor
withPythonOperator
. This means that when thePythonOperator
runs it only execute the init function ofS3KeySensor
- it doesn't invoke the logic of the operator itself.Using operator inside operator is a bad practice.
Your case is even more extreme as you are trying to use sensor inside operator. Sensors need to invoke the
poke()
function for every poking cycle.To simplify - You can not enjoy the power of Sensor with
mode = 'reschedule'
when you set them as you did because reschedule means that you want to release the worker if condition is not met yet butPythonOperator
doesn't know how to do that.How to solve your issue:
Option 1:
From the code you showed you can simply do:
I didn't see a reason why this can't work for you.
Option 2:
If for some reason option 1 is not good for you then create a custom sensor that accept also
dependency_name
,path
and use it like any other operator.I didn't test it but something like the following should work: