气流Docker-将功能导入DAG时的名称
我目前正在尝试使用Windows上的Puckel Docker-airflow 1.10.9图像来构建数据管道。
当功能从DAG文件中调用DAG时,它可以很好地工作,我可以访问代码的输出。
但是,当我尝试从My_scripts目录以与My_dags目录相同的级别导入函数时,我会收到以下错误:
NameError: name 'reorder_columns' is not defined
下面是我用于气流的文件夹结构:
airflow
|
dags
├── dockerfiles
| ├── Dockerfile
|
├── docker-compose.yml
|
├── trading_pipeline
│ ├── my_dags
│ │ ├── demand_forecast_dag.py
│ │ ├── __init__.py
│ │ ├── DataStore.py
| ├── my_scripts
│ | ├── demand_forecast.py
│ | ├── __init__.py
| |
| ├── __init__.py
|
├── __init__.py
下面是demand_forecast_dag文件:
try:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import requests
import xml.etree.ElementTree as ET
import pandas as pd
import DataStore
import os
import datetime
from dags.trading_pipeline.my_scripts.demand_forecast import reorder_columns
print("All DAG modules are ok......")
except Exception as e:
print("Error {} ".format(e))
col_name_1 = "Demand_Forecast"
col_name_2 = "Settlement_Period"
col_name_3 = "Date"
with DAG(
dag_id="demand_forecast_dag",
schedule_interval='0 9 * * *',
default_args={
"owner": "airflow",
"retries": 1,
"retry_delay": datetime.timedelta(minutes=5),
"start_date": datetime.datetime(2021, 1, 1),
},
catchup=False) as f:
get_demand_forecast = PythonOperator(
task_id="reorder_columns",
python_callable=reorder_columns,
op_args=[col_name_1, col_name_2, col_name_3]
)
下面是demand_forecast 。
import requests
import xml.etree.ElementTree as ET
import pandas as pd
import DataStore
import os
import datetime
settlement_datetime = datetime.date.today() + datetime.timedelta(days=1)
required_date = settlement_datetime.strftime("%Y-%m-%d")
col_name_1 = "Demand_Forecast"
col_name_2 = "Settlement_Period"
col_name_3 = "Date"
def clock_change_sp(settlement_date):
"""when there is a clock change, there is a different number of settlement periods, this function uses the
DataStore file to determine the number of sp based on if the required date is a clock change
:param settlement_date: uses the settlement date at the top of the script
:return x + 1: the number of required settlement periods plus 1"""
if settlement_date in DataStore.clocks_forward:
x = 46
elif settlement_date in DataStore.clocks_back:
x = 50
else:
x = 48
return x + 1
def get_demand_forecast(date):
api_key = "....."
"""uses the BMReports API to pull in the demand forecast for each settlement period of the following day
:param self: uses the settlement_date and api_key from the Data class
:return: a list of the demand forecast for the following day"""
initial_data_list = []
session = requests.Session()
for x in range(0, 1, 1):
url = "https://api.bmreports.com/BMRS/FORDAYDEM/V1?APIKey=" + api_key + "&FromDate=" + date + "&ToDate=" + date + "&ServiceType=xml"
response = session.get(url)
request_data = response.text
root = ET.fromstring(request_data)
for item in root.findall('.//responseBody/responseList/item/demand'):
initial_data_list.append(item.text)
demand_forecast_list = initial_data_list[:clock_change_sp(required_date)-1:]
return demand_forecast_list
def create_dataframes(col_1):
"""converts the list required list from above into a dataframe, converts it to GW and rounds to 2dp
:param self: inherits the demand_forecast_list from the previous function, plus col_name_1, which is the name
of the data column "Demand Forecast"
:return df: a dataframe with the required data in GW and rounded to 2dp
"""
df = pd.DataFrame(get_demand_forecast(required_date), columns=[col_1])
df[col_1] = df[col_1].astype(str).astype(int)
df[col_1] = df[col_1].div(1000).round(2)
# print(df)
return df
def add_dates_and_sp(col_2, col_3, settlement_date):
"""adds the selected date and settlement periods to the dataframe
:param self: inherits the dataframe from the previous function, the number of settlement periods from the first function,
plus col_name_2 and col_name_3 which are the SP and Date column names respectively
:return df: a dataframe with all the collected data plus the Date and Settlement Period"""
df = create_dataframes(col_name_1)
df[col_2] = list(range(1, clock_change_sp(required_date)))
df[col_3] = settlement_date
# print(df)
return df
def reorder_columns(col_1, col_2, col_3):
"""reorders the columns so that it reads more easily
:param self: the dataframe from the previous function with all the data but in the wrong order
:return df: a dataframe with the columns in the required order"""
df = add_dates_and_sp(col_2, col_3, required_date)[[col_3, col_2, col_1]]
print(df)
return df
让我知道您是否需要更多信息。
预先感
谢乔什
I'm currently attempting to build a data pipeline using the puckel docker-airflow 1.10.9 image on windows.
When the functions are called to the DAG from within the DAG file, it works well and I can access the output of the code.
However, when I try to import the function from the my_scripts directory at the same level as my_dags directory, I get the following error:
NameError: name 'reorder_columns' is not defined
Below is the folder structure that I am using for my airflow:
airflow
|
dags
├── dockerfiles
| ├── Dockerfile
|
├── docker-compose.yml
|
├── trading_pipeline
│ ├── my_dags
│ │ ├── demand_forecast_dag.py
│ │ ├── __init__.py
│ │ ├── DataStore.py
| ├── my_scripts
│ | ├── demand_forecast.py
│ | ├── __init__.py
| |
| ├── __init__.py
|
├── __init__.py
Below is the demand_forecast_dag file:
try:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import requests
import xml.etree.ElementTree as ET
import pandas as pd
import DataStore
import os
import datetime
from dags.trading_pipeline.my_scripts.demand_forecast import reorder_columns
print("All DAG modules are ok......")
except Exception as e:
print("Error {} ".format(e))
col_name_1 = "Demand_Forecast"
col_name_2 = "Settlement_Period"
col_name_3 = "Date"
with DAG(
dag_id="demand_forecast_dag",
schedule_interval='0 9 * * *',
default_args={
"owner": "airflow",
"retries": 1,
"retry_delay": datetime.timedelta(minutes=5),
"start_date": datetime.datetime(2021, 1, 1),
},
catchup=False) as f:
get_demand_forecast = PythonOperator(
task_id="reorder_columns",
python_callable=reorder_columns,
op_args=[col_name_1, col_name_2, col_name_3]
)
Below is the demand_forecast.py file that the function is being imported into the DAG from:
import requests
import xml.etree.ElementTree as ET
import pandas as pd
import DataStore
import os
import datetime
settlement_datetime = datetime.date.today() + datetime.timedelta(days=1)
required_date = settlement_datetime.strftime("%Y-%m-%d")
col_name_1 = "Demand_Forecast"
col_name_2 = "Settlement_Period"
col_name_3 = "Date"
def clock_change_sp(settlement_date):
"""when there is a clock change, there is a different number of settlement periods, this function uses the
DataStore file to determine the number of sp based on if the required date is a clock change
:param settlement_date: uses the settlement date at the top of the script
:return x + 1: the number of required settlement periods plus 1"""
if settlement_date in DataStore.clocks_forward:
x = 46
elif settlement_date in DataStore.clocks_back:
x = 50
else:
x = 48
return x + 1
def get_demand_forecast(date):
api_key = "....."
"""uses the BMReports API to pull in the demand forecast for each settlement period of the following day
:param self: uses the settlement_date and api_key from the Data class
:return: a list of the demand forecast for the following day"""
initial_data_list = []
session = requests.Session()
for x in range(0, 1, 1):
url = "https://api.bmreports.com/BMRS/FORDAYDEM/V1?APIKey=" + api_key + "&FromDate=" + date + "&ToDate=" + date + "&ServiceType=xml"
response = session.get(url)
request_data = response.text
root = ET.fromstring(request_data)
for item in root.findall('.//responseBody/responseList/item/demand'):
initial_data_list.append(item.text)
demand_forecast_list = initial_data_list[:clock_change_sp(required_date)-1:]
return demand_forecast_list
def create_dataframes(col_1):
"""converts the list required list from above into a dataframe, converts it to GW and rounds to 2dp
:param self: inherits the demand_forecast_list from the previous function, plus col_name_1, which is the name
of the data column "Demand Forecast"
:return df: a dataframe with the required data in GW and rounded to 2dp
"""
df = pd.DataFrame(get_demand_forecast(required_date), columns=[col_1])
df[col_1] = df[col_1].astype(str).astype(int)
df[col_1] = df[col_1].div(1000).round(2)
# print(df)
return df
def add_dates_and_sp(col_2, col_3, settlement_date):
"""adds the selected date and settlement periods to the dataframe
:param self: inherits the dataframe from the previous function, the number of settlement periods from the first function,
plus col_name_2 and col_name_3 which are the SP and Date column names respectively
:return df: a dataframe with all the collected data plus the Date and Settlement Period"""
df = create_dataframes(col_name_1)
df[col_2] = list(range(1, clock_change_sp(required_date)))
df[col_3] = settlement_date
# print(df)
return df
def reorder_columns(col_1, col_2, col_3):
"""reorders the columns so that it reads more easily
:param self: the dataframe from the previous function with all the data but in the wrong order
:return df: a dataframe with the columns in the required order"""
df = add_dates_and_sp(col_2, col_3, required_date)[[col_3, col_2, col_1]]
print(df)
return df
I am fairly new to airflow and docker so apologies if this is a simple error.
Let me know if you need any more information.
Thanks in advance,
Josh
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
在进行故障排除和读取更多堆栈溢出文章之后,我设法从另一个文件夹中的脚本导入该函数。
有两个部分,第一个涉及在我的Dockerfile中设置我的AirFlow_Home和PythonPath:
我还将一些卷添加到我的Docker-compose.yml文件中:
不确定这是否是最好的方法,但它正在起作用。
After some troubleshooting and reading more Stack Overflow articles I managed to import the function from the script in another folder.
There were two parts to it, the first involved setting my AIRFLOW_HOME and PYTHONPATH in my Dockerfile:
I also added some volumes into my docker-compose.yml file:
Not sure if this is the best method but it is working.
很高兴总是有一个单独的文件以具有干净的代码,但是您不必导入单个函数,您可以轻松地通过文件名来导入我,我认为您的导入是错误的,您可以像这样导入**
**也导入OS和SYS以及文件导入,以便它可以读取文件不读取文件**
您可以使用python collap例如
var1 = reorder_columns(col_1,col_2,col_3)
,然后将var 1放入您的python合作中,
请尝试一下,让我知道您是否遇到任何问题
It's nice to always have a separate file for functions to have a clean code but you don't have to import a single function you can easily import like this by file name I think your import is wrong you can import like this **
** Also import os and sys and file import so that it can read the file as of know it is not reading the files **
you can use python collab but you can also like do this method like
var1 = reorder_columns(col_1, col_2, col_3)
and then put the var 1 in your python collab
try this and let me know if you face any problem