如何计算Apache气流传感器总执行时间

发布于 2025-02-06 08:00:27 字数 480 浏览 5 评论 0 原文

sensor_job = PythonSensor(
            task_id='sensor_id',
            python_callable=call_jobsensor,
            poke_interval=10,
            timeout=7 * 60,
            mode='reschedule',
        )        


def call_jobsensor():
    # start timer
    # do something
    # stop timer

用例 - 在此示例中,我试图捕获完成传感器作业所花费的时间。在这里完成意味着它应该超时,或者传感器返回true。 如果我使用传统方法,它将以小块(传感器调用此功能调用)返回时间信息,但我需要的是总时间。

我可以在这里使用气流或统计数据的任何方法或支持吗?还是其他建议?

sensor_job = PythonSensor(
            task_id='sensor_id',
            python_callable=call_jobsensor,
            poke_interval=10,
            timeout=7 * 60,
            mode='reschedule',
        )        


def call_jobsensor():
    # start timer
    # do something
    # stop timer

Use Case - In this example, I am trying to capture the total time taken to complete the sensor job. Here completing means either it should timeout or the sensor returns true.
If I go by the traditional method, It will return the time information in small chunks(every time the sensor calls this function) but what I need is the total time.

Is there any method or support from Airflow or Statsd that I can utilize here? Or any other suggestion?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

孤单情人 2025-02-13 08:00:27

这是假设您正在使用气流V2:

您可以使用气流数据库后端将任务的持续时间用于DAG。

这是您假设您已经设置了并配置了默认 airflow_db 与主机,架构,用户名和密码连接。

根据所使用的数据库,您需要安装相关的提供商。

您可以查看 apache-airflow-providers-mysql 在这里

apache-airflow-providers-postgres的文档 在这里

您需要在全球范围内将提供商安装到气流环境中。

task_instance 表包含每个任务运行的持续时间。没有一种简单的方法可以查看表的模式,一种方法是使用第三方工具(例如DBEAVER)并连接到数据库并检查表 +模式。

使用PostgreSQL访问的示例DAG为(如果使用MySQL只需将导入交换为 mySqlhook ):

import pendulum

# from airflow.providers.mysql.hooks.mysql import MySqlHook  # install apache-airflow-providers-mysql
from airflow.providers.postgres.hooks.postgres import PostgresHook  # install apache-airflow-providers-postgres
from airflow.decorators import dag, task


@dag(start_date=pendulum.yesterday(tz="Europe/London"))
def test_dag():
    @task()
    def test_duration_task():
        # or a PythonSensor operator...
        from time import sleep
        sleep(3)
        
    @task()
    def run_query(**context):
        postgres_hook = PostgresHook(postgres_conn_id="airflow_db")
        records = postgres_hook.get_records(sql="select duration from task_instance where task_id='test_duration_task' order by end_date desc")
        print(f"task duration for test_duration_task = {records[0][0]}")

    test_duration_task() >> run_query()


test_dag_failure_dag = test_dag()

run_query is的日志输出IS:

[2022-06-10, 19:57:59 UTC] {base.py:68} INFO - Using connection ID '***_db' for task execution.
[2022-06-10, 19:57:59 UTC] {logging_mixin.py:115} INFO - task duration for test_duration_task = 3.200903
[2022-06-10, 19:57:59 UTC] {python.py:173} INFO - Done. Returned value was: None

This assumes you are using Airflow v2:

You can use the airflow database backend to get the duration of tasks in a DAG.

This assumes you have set up an airflow database backend and configured the default airflow_db connection with your host, schema, username and password.

Depending on what database you are using, you'll need to install the relevant provider.

You can view the documentation for apache-airflow-providers-mysql here.

And the documentation for apache-airflow-providers-postgres here.

You'll need to globally install the provider to the airflow environment.

The task_instance table contains the duration that each task took to run. There isn't an easy way to see the schemas of the tables, one way is to use a 3rd party tool (such as DBeaver) and connect to the database and inspect the tables + schemas.

An example DAG for accessing using PostgreSQL is (if using MySQL just swap the import for the MySQLHook):

import pendulum

# from airflow.providers.mysql.hooks.mysql import MySqlHook  # install apache-airflow-providers-mysql
from airflow.providers.postgres.hooks.postgres import PostgresHook  # install apache-airflow-providers-postgres
from airflow.decorators import dag, task


@dag(start_date=pendulum.yesterday(tz="Europe/London"))
def test_dag():
    @task()
    def test_duration_task():
        # or a PythonSensor operator...
        from time import sleep
        sleep(3)
        
    @task()
    def run_query(**context):
        postgres_hook = PostgresHook(postgres_conn_id="airflow_db")
        records = postgres_hook.get_records(sql="select duration from task_instance where task_id='test_duration_task' order by end_date desc")
        print(f"task duration for test_duration_task = {records[0][0]}")

    test_duration_task() >> run_query()


test_dag_failure_dag = test_dag()

The log output of run_query is:

[2022-06-10, 19:57:59 UTC] {base.py:68} INFO - Using connection ID '***_db' for task execution.
[2022-06-10, 19:57:59 UTC] {logging_mixin.py:115} INFO - task duration for test_duration_task = 3.200903
[2022-06-10, 19:57:59 UTC] {python.py:173} INFO - Done. Returned value was: None
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文