在对Aiflow DAG连接的任务进行回填之前,我需要清理数据库(Postgres/Mongo)。让我们假设这个简单的dag:
->task2
task1 ->task4
->task3
每个任务都会按照正常执行的数据库每天都可以将其结果
每天都可以。
但是,当涉及到此DAG的回填时,过去几天已经处理过,但是我们需要回填一次一次计算结果(即更改了任务2的代码,我们需要刷新)已经有数据与过去几天相应的数据库,在回填过程中将提出IE重复。因此,现在我需要删除与需要在执行回填之前需要回填的任务相对应的手动数据。
我要:对于每个任务,将制作脚本,该脚本将进行当前执行日期的数据库中的相应数据,并在运行实际任务之前运行此脚本,该任务将填充数据库中的数据再次。但是出于绩效原因,我希望此删除脚本仅在处理回填并且不在每日执行dag时运行它才能运行。
我缺少的是如何判断任务是由回填还是正常的每日执行(某些标志等)来运行,或者也许可以选择运行 pre_task_execute_execute_when_backfill_backfill_callable_callable
函数>在气流任务中分配某个地方?
I need to clean up databases (postgres/mongo) before backfilling of tasks connected to aiflow DAG. Let's assume this simple DAG:
->task2
task1 ->task4
->task3
Each task writes it's results to database
On normal execution on daily basis everything is ok - there is no data in databases regarding processing actual day.
But when it comes to backfill of this DAG, when past days were already processed but we need backfill to compute results one more time (i.e. code for task2 was changed and we need refresh) there are already data in databases to corresponding past days and there will be i.e duplicateError raised during backfill. So now I need to delete manually data corresponding to tasks that need backfill before executing backfill.
I want to: for each task make script which will do cleaning corresponding data from databases for current execution date and run this script just before running real task that will fill data in databases once again. But for performance reasons I want this deletion script to run only when there is backfill processed and not run it on daily execution of DAG.
What I am missing is how to tell if task is being run by backfill or normal daily execution (some flag etc) or maybe there is option to run something like pre_task_execute_when_backfill_callable
function to assign somewhere in airflow task?
发布评论
评论(1)
目前,我以下面的方式结束了两种方法:
我正在呼叫
delete_day_data_data_when_backfill()
在我想执行的每个任务的开始时,都使用相应的功能将数据库清除为参数。因此DAG任务模块可以看起来像:总结:
我缺少判断气流是否正在执行或气流正在执行正常的日常任务。因此,回答问题是此方法:
如果执行
day
小于.today()-TimeDelta(1)
,它将是回填执行在我的情况下(每天执行任务时)。非常简单的条件,我在气流文档中没有找到任何标志,但是我没有搜索很长时间。希望这对某人有帮助:)
For now I have ended with two methods like below:
I'm calling
delete_day_data_when_backfill()
on the start of each task that I would like to execute with corresponding function that cleans databases as parameter. So DAG task module could look like this:Sum up:
I was missing the way to tell whether airflow is executing as backfill or airflow is executing normal daily task. So the answer to question is this method:
if execution
day
is less than.today() - timedelta(1)
it will be backfill executing in my situation (when task are executed on daily basis). Very simple condition and I haven't found any flag in documentation of airflow, but I wasn't search for long.Hopefully that helps somebody :)