如何计算Apache气流传感器总执行时间
sensor_job = PythonSensor(
task_id='sensor_id',
python_callable=call_jobsensor,
poke_interval=10,
timeout=7 * 60,
mode='reschedule',
)
def call_jobsensor():
# start timer
# do something
# stop timer
用例 - 在此示例中,我试图捕获完成传感器作业所花费的时间。在这里完成意味着它应该超时,或者传感器返回true。 如果我使用传统方法,它将以小块(传感器调用此功能调用)返回时间信息,但我需要的是总时间。
我可以在这里使用气流或统计数据的任何方法或支持吗?还是其他建议?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这是假设您正在使用气流V2:
您可以使用气流数据库后端将任务的持续时间用于DAG。
这是您假设您已经设置了并配置了默认
airflow_db
与主机,架构,用户名和密码连接。根据所使用的数据库,您需要安装相关的提供商。
您可以查看
apache-airflow-providers-mysql
在这里。和
apache-airflow-providers-postgres的文档
在这里。您需要在全球范围内将提供商安装到气流环境中。
task_instance
表包含每个任务运行的持续时间。没有一种简单的方法可以查看表的模式,一种方法是使用第三方工具(例如DBEAVER)并连接到数据库并检查表 +模式。使用PostgreSQL访问的示例DAG为(如果使用MySQL只需将导入交换为
mySqlhook
):run_query
is的日志输出IS:This assumes you are using Airflow v2:
You can use the airflow database backend to get the duration of tasks in a DAG.
This assumes you have set up an airflow database backend and configured the default
airflow_db
connection with your host, schema, username and password.Depending on what database you are using, you'll need to install the relevant provider.
You can view the documentation for
apache-airflow-providers-mysql
here.And the documentation for
apache-airflow-providers-postgres
here.You'll need to globally install the provider to the airflow environment.
The
task_instance
table contains the duration that each task took to run. There isn't an easy way to see the schemas of the tables, one way is to use a 3rd party tool (such as DBeaver) and connect to the database and inspect the tables + schemas.An example DAG for accessing using PostgreSQL is (if using MySQL just swap the import for the
MySQLHook
):The log output of
run_query
is: