需要备份数据,但它不在气流中工作

发布于 2025-02-05 18:01:37 字数 2998 浏览 2 评论 0原文

我正在尝试在气流中备份数据,但没有给出任何错误,也无法备份数据,因为它被跳过了。我编写的代码是: -

import os
from airflow import DAG
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import (
BigQueryToGCSOperator,
)
from composer_plugins import get_list_to_backup
from datetime import datetime, timedelta, date
from airflow.exceptions import AirflowFailException, AirflowSkipException
from airflow.operators.python import PythonOperator

"""the function validates if the schedule_day parameter is a valid day to execute the task schedule_day is a number and corresponds to day of the week
1 - Monday
2 - Tuesday
3 - Wednesday
4 - Thursday
5 - Friday
6 - Saturday
7 - Sunday """
def _check_valid_day(**kwargs):
    today = datetime.today()
    if today.isoweekday()==kwargs["schedule_day"]:
        return True
    else:
     raise AirflowSkipException("does not correspond to the backup day")

today = datetime.today()
today_str = today.strftime("%Y-%m-%d")
#start_date = get_next_weekday(today_str, 5)  # 5 = Saturday
start_date = datetime(2022, 5, 2)
dag_id = "data_bq_weekly_backups_dag"

event_collection_project_id = os.environ["EVENT_COLLECTION_PROJECT_ID"]
tables_to_backup = os.environ["TABLES_TO_BACKUP"]
destination_bucket = os.environ["WEEKLY_BQ_BACKUP_BUCKET"]
schedule_day =os.environ["BACKUP_SCHEDULE_DAY"]

default_dag_args = {
# Setting start date for next Saturday in order to maintain the scheduler
# in a consistent state
"start_date": start_date,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
"email_on_failure": False,
"email_on_retry": False,
# If a task fails, retry it once after waiting at least what's specified in retry_delay
"retries": 1,
"retry_delay": timedelta(seconds=10),
"project_id": event_collection_project_id,
"schedule_interval": "0 2 * * *",
}


tables_to_backup_list = get_list_to_backup(tables_to_backup)

with DAG(dag_id=dag_id, default_args=default_dag_args,catchup=False) as dag:

check_valid_day = PythonOperator(
    task_id='check_valid_day',
    python_callable=_check_valid_day,
    op_kwargs={
            "schedule_day": schedule_day
            },
    )
task_dict = dict()

for table_to_backup in tables_to_backup_list:
    dataset = table_to_backup.split(".")[0]
    table = table_to_backup.split(".")[1]
    task_name = f"{dataset}_{table}_table_weekly_backup"
    task_dict[task_name] = BigQueryToGCSOperator(
        task_id=task_name,
        trigger_rule="all_success",
        dag=dag,
        source_project_dataset_table=table_to_backup,
        destination_cloud_storage_uris=[
            f"gs://{destination_bucket}/{dataset}/{table}/{today.year}/{today.month}/{today.day}/{table}-*.avro"
        ],
        export_format="AVRO",  # OPTIONS: AVRO, CSV, JSON
        compression="NONE",  # OPTIONS: NONE, DEFLATE, GZIP, SNAPPY
        labels=None,
    )
    check_valid_day >>  task_dict[task_name]

当我执行此DAG时,没有错误,但它跳过了所有内容: - 态 backup_schedule_day = 3在环境变量文件中设置。我不知道这是怎么了,为什么它不起作用

I am trying to backup the data in Airflow but it's not giving any error and also I am unable to backup the data because it's getting skipped. The code which I have written is :-

import os
from airflow import DAG
from airflow.providers.google.cloud.transfers.bigquery_to_gcs import (
BigQueryToGCSOperator,
)
from composer_plugins import get_list_to_backup
from datetime import datetime, timedelta, date
from airflow.exceptions import AirflowFailException, AirflowSkipException
from airflow.operators.python import PythonOperator

"""the function validates if the schedule_day parameter is a valid day to execute the task schedule_day is a number and corresponds to day of the week
1 - Monday
2 - Tuesday
3 - Wednesday
4 - Thursday
5 - Friday
6 - Saturday
7 - Sunday """
def _check_valid_day(**kwargs):
    today = datetime.today()
    if today.isoweekday()==kwargs["schedule_day"]:
        return True
    else:
     raise AirflowSkipException("does not correspond to the backup day")

today = datetime.today()
today_str = today.strftime("%Y-%m-%d")
#start_date = get_next_weekday(today_str, 5)  # 5 = Saturday
start_date = datetime(2022, 5, 2)
dag_id = "data_bq_weekly_backups_dag"

event_collection_project_id = os.environ["EVENT_COLLECTION_PROJECT_ID"]
tables_to_backup = os.environ["TABLES_TO_BACKUP"]
destination_bucket = os.environ["WEEKLY_BQ_BACKUP_BUCKET"]
schedule_day =os.environ["BACKUP_SCHEDULE_DAY"]

default_dag_args = {
# Setting start date for next Saturday in order to maintain the scheduler
# in a consistent state
"start_date": start_date,
# To email on failure or retry set 'email' arg to your email and enable
# emailing here.
"email_on_failure": False,
"email_on_retry": False,
# If a task fails, retry it once after waiting at least what's specified in retry_delay
"retries": 1,
"retry_delay": timedelta(seconds=10),
"project_id": event_collection_project_id,
"schedule_interval": "0 2 * * *",
}


tables_to_backup_list = get_list_to_backup(tables_to_backup)

with DAG(dag_id=dag_id, default_args=default_dag_args,catchup=False) as dag:

check_valid_day = PythonOperator(
    task_id='check_valid_day',
    python_callable=_check_valid_day,
    op_kwargs={
            "schedule_day": schedule_day
            },
    )
task_dict = dict()

for table_to_backup in tables_to_backup_list:
    dataset = table_to_backup.split(".")[0]
    table = table_to_backup.split(".")[1]
    task_name = f"{dataset}_{table}_table_weekly_backup"
    task_dict[task_name] = BigQueryToGCSOperator(
        task_id=task_name,
        trigger_rule="all_success",
        dag=dag,
        source_project_dataset_table=table_to_backup,
        destination_cloud_storage_uris=[
            f"gs://{destination_bucket}/{dataset}/{table}/{today.year}/{today.month}/{today.day}/{table}-*.avro"
        ],
        export_format="AVRO",  # OPTIONS: AVRO, CSV, JSON
        compression="NONE",  # OPTIONS: NONE, DEFLATE, GZIP, SNAPPY
        labels=None,
    )
    check_valid_day >>  task_dict[task_name]

When I am executing this DAG, there is no error but it's skipping everything :-
Airflow DAG TREE VIEW
BACKUP_SCHEDULE_DAY=3 is set in environment variable file. I don't know what's wrong in this and why it's not working

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

捎一片雪花 2025-02-12 18:01:37

我尝试了您的代码,我能够复制您的问题。请参阅下面的运行历史记录:

“在此处输入图像描述”

注意:在运行代码之前,我像环境变量一样进行了硬编码值,tables_to_backup_list使其在我的环境上工作。

主要问题是_CHECK_VALID_DAY()。当该行执行如果今天。ISOWEEKDAY()== kwargs [“ scheduy_day”]始终是 false ,因为它们具有数据类型不匹配。

print(today.isoweekday()) # <class 'int'>
print(kwargs["schedule_day"]) # <class 'str'>

修复程序是使其数据类型匹配。以下解决方案是将kwargs [“ scheduy_day”]转换为 type int

def _check_valid_day(**kwargs):
    today = datetime.today()
    if today.isoweekday() == int(kwargs["schedule_day"]): #convert to int
        print("inside the if statement")
        return True
    else:
        raise AirflowSkipException("does not correspond to the backup day")

图形视图:

“

chacd_valid_day logs:logs:logs:logs:logs:logs:logs:

I tried your code I was able to reproduce your issue. See run history below:

enter image description here

NOTE: Prior to running your code, I hardcoded values like your environment variables and tables_to_backup_list to make it work on my environment.

The main problem is in _check_valid_day(). When this line is executed if today.isoweekday()==kwargs["schedule_day"] it is always false because they have a data type mismatch.

print(today.isoweekday()) # <class 'int'>
print(kwargs["schedule_day"]) # <class 'str'>

The fix is to make their data types match. Fix below is to convert kwargs["schedule_day"] to type int:

def _check_valid_day(**kwargs):
    today = datetime.today()
    if today.isoweekday() == int(kwargs["schedule_day"]): #convert to int
        print("inside the if statement")
        return True
    else:
        raise AirflowSkipException("does not correspond to the backup day")

Graph view:

enter image description here

check_valid_day Logs:

enter image description here

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文