使用 docker-compose 和 Airflow 进行 Python cron 作业
我正在尝试使用 Docker 和 Airflow 运行这个 Pyhon 函数,它应该是一个简单的 cron 作业,但我无法让它工作。我对他们俩都没有经验。
下面是我的 Python 代码,其正下方是在 Docker 容器内运行的 Airflow 中引发的错误。
Docker 版本:20.10.12 Python 3.9
from airflow.operators.bash import BashOperator
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta
import datetime
now = datetime.datetime.now()
seven_days_ago = datetime.datetime.combine(datetime.datetime.today() - timedelta(7),
datetime.datetime.min.time())
def f_run():
print('blabla')
return f_run
runner = f_run()
default_args = {
'owner': 'teste',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
args = {
'owner': 'renan',
'start_date': seven_days_ago # make start date in the past
}
dag = DAG(
dag_id='teste',
default_args=args,
schedule_interval='@daily' # make this workflow happen every day
)
with dag:
func = PythonOperator(
task_id='run',
python_callable = runner
)
在气流中引发的错误是:
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type function is not JSON serializable
I'm trying to run this Pyhon function with Docker and Airflow, it should be a simple cron job but I can't make it work. I am very unexperienced with both of them.
Below is my Python code and right below it is the error raised in Airflow which is running inside a Docker container.
Docker version: 20.10.12
Python 3.9
from airflow.operators.bash import BashOperator
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta
import datetime
now = datetime.datetime.now()
seven_days_ago = datetime.datetime.combine(datetime.datetime.today() - timedelta(7),
datetime.datetime.min.time())
def f_run():
print('blabla')
return f_run
runner = f_run()
default_args = {
'owner': 'teste',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
args = {
'owner': 'renan',
'start_date': seven_days_ago # make start date in the past
}
dag = DAG(
dag_id='teste',
default_args=args,
schedule_interval='@daily' # make this workflow happen every day
)
with dag:
func = PythonOperator(
task_id='run',
python_callable = runner
)
And in airflow the error raised is:
raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type function is not JSON serializable
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论