检索XCOMM值并将其传递给EMR操作员,气流
我正在尝试从Python运营商中检索价值,并将其传递给“ EMR创建作业”和“添加步骤操作员”。我如何在spark_steps中传递此值, 我已经使用task_instance访问它,但是它没有起作用,有人可以告诉我如何访问它吗?
错误:
名称:名称'task_instance'未定义
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.models.connection import Connection
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
from airflow.operators.bash_operator import BashOperator
from airflow.hooks.base_hook import BaseHook
import boto3
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.dates import days_ago
from airflow.operators.dummy import DummyOperator
import pendulum
from airflow.operators.python import PythonOperator
import json, os
import pytz
from airflow.models import Variable
os.environ['AWS_ACCESS_KEY_ID']="11111"
os.environ['AWS_SECRET_ACCESS_KEY']="111111"
os.environ['AWS_DEFAULT_REGION']='us-west-1'
account_id = boto3.client('sts').get_caller_identity().get('Account')
def get_secret():
print("started")
secret_name = Variable.get("secret_name")
region_name = Variable.get("region_name")
# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(service_name='secretsmanager', region_name=region_name)
account_id = boto3.client('sts').get_caller_identity().get('Account')
try:
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
if 'SecretString' in get_secret_value_response:
secret_str = get_secret_value_response['SecretString']
secret=json.loads(secret_str)
airflow_path=secret["airflow_path"]
return airflow_path
except Exception as e:
print("AWS Exception raised :" +e)
raise
local_tz = pendulum.timezone("America/Chicago")
DAG_ID = os.path.basename(__file__).replace(".py", "")
os.environ['AWS_DEFAULT_REGION'] = 'us-west-1'
DEFAULT_ARGS = {
'owner': 'Sam Kurth',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
}
SPARK_STEPS = [
{
'Name': 'Spark-Submit Command',
"ActionOnFailure": "CONTINUE",
'HadoopJarStep': {
"Jar": "command-runner.jar",
"Args": [
'spark-submit',
'--py-files',
's3://'+{{ task_instance.xcom_pull('get_aws_fields', key='return_value') }}+'-pyspark/hell/config.zip,s3://'+{{ task_instance.xcom_pull('get_aws_fields', key='return_value') }}+'-pyspark/hell/jobs.zip,s3://'+path+'-pyspark/hell/DDL.zip',
's3://'+path+'-pyspark/hell/main.py'
],
},
},
{
'Name': 'Copy Test Scripts',
"ActionOnFailure": "TERMINATE_CLUSTER",
'HadoopJarStep': {
"Jar": "command-runner.jar",
"Args": ["aws","s3","cp","s3://"+{{task_instance.xcom_pull(task_ids='get_aws_fields', key='return_value') }}+"-pyspark/hell/run_test.sh","/home/hadoop/"],
}
},
{
'Name': 'Execute Test Scripts',
"ActionOnFailure": "TERMINATE_CLUSTER",
'HadoopJarStep': {
"Jar": "command-runner.jar",
"Args": ["sh","/home/hadoop/run_test.sh"],
}
}
]
JOB_FLOW_OVERRIDES = {
"Name": "Hell ETL",
"LogUri": "s3://aws-logs-559293306438-us-west-1/elasticmapreduce/",
"ReleaseLabel": "emr-6.2.0",
"Applications": [
{
"Name": "Spark"
},
],
"Instances": {
"Ec2SubnetId": "subnet-0d626501d9db34925",
"InstanceGroups": [
{
"Name": "Master nodes",
"Market": "ON_DEMAND",
"InstanceRole": "MASTER",
"InstanceType": "m5.xlarge",
"InstanceCount": 1,
}
],
"KeepJobFlowAliveWhenNoSteps": False,
"TerminationProtected": False,
},
"BootstrapActions": [
{
"Name": "Install Dependencies",
"ScriptBootstrapAction": {
"Path": "s3://"+path+"-pyspark/hell/install_python_modules.sh",
}
}
],
}
with DAG(
dag_id=DAG_ID,
description='Run built-in Spark app on Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=pendulum.datetime(2022, 4, 19,tz=local_tz),
schedule_interval='05 7 * * *',
tags=['emr-dev'],
) as dag:
get_aws_secret=[PythonOperator(task_id='get_aws_fields',python_callable=get_secret)]
cluster_creator = EmrCreateJobFlowOperator(ti,task_id='create_job_flow',job_flow_overrides=JOB_FLOW_OVERRIDES)
step_adder = EmrAddStepsOperator(task_id='add_steps',job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',steps=SPARK_STEPS, )
step_checker = EmrStepSensor(task_id='watch_step',job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",aws_conn_id='aws_default',)
get_aws_secret>>cluster_creator >> step_adder >> step_checker
I am trying to retrieve value from a python operator and pass it to "EMR create job" and "add steps operator". How do I pass this value in spark_steps,
I have used task_instance to access it but it didn't work, can someone please tell me how to access this?
Error:
NameError: name 'task_instance' is not defined
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.models.connection import Connection
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
from airflow.operators.bash_operator import BashOperator
from airflow.hooks.base_hook import BaseHook
import boto3
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.dates import days_ago
from airflow.operators.dummy import DummyOperator
import pendulum
from airflow.operators.python import PythonOperator
import json, os
import pytz
from airflow.models import Variable
os.environ['AWS_ACCESS_KEY_ID']="11111"
os.environ['AWS_SECRET_ACCESS_KEY']="111111"
os.environ['AWS_DEFAULT_REGION']='us-west-1'
account_id = boto3.client('sts').get_caller_identity().get('Account')
def get_secret():
print("started")
secret_name = Variable.get("secret_name")
region_name = Variable.get("region_name")
# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(service_name='secretsmanager', region_name=region_name)
account_id = boto3.client('sts').get_caller_identity().get('Account')
try:
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
if 'SecretString' in get_secret_value_response:
secret_str = get_secret_value_response['SecretString']
secret=json.loads(secret_str)
airflow_path=secret["airflow_path"]
return airflow_path
except Exception as e:
print("AWS Exception raised :" +e)
raise
local_tz = pendulum.timezone("America/Chicago")
DAG_ID = os.path.basename(__file__).replace(".py", "")
os.environ['AWS_DEFAULT_REGION'] = 'us-west-1'
DEFAULT_ARGS = {
'owner': 'Sam Kurth',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
}
SPARK_STEPS = [
{
'Name': 'Spark-Submit Command',
"ActionOnFailure": "CONTINUE",
'HadoopJarStep': {
"Jar": "command-runner.jar",
"Args": [
'spark-submit',
'--py-files',
's3://'+{{ task_instance.xcom_pull('get_aws_fields', key='return_value') }}+'-pyspark/hell/config.zip,s3://'+{{ task_instance.xcom_pull('get_aws_fields', key='return_value') }}+'-pyspark/hell/jobs.zip,s3://'+path+'-pyspark/hell/DDL.zip',
's3://'+path+'-pyspark/hell/main.py'
],
},
},
{
'Name': 'Copy Test Scripts',
"ActionOnFailure": "TERMINATE_CLUSTER",
'HadoopJarStep': {
"Jar": "command-runner.jar",
"Args": ["aws","s3","cp","s3://"+{{task_instance.xcom_pull(task_ids='get_aws_fields', key='return_value') }}+"-pyspark/hell/run_test.sh","/home/hadoop/"],
}
},
{
'Name': 'Execute Test Scripts',
"ActionOnFailure": "TERMINATE_CLUSTER",
'HadoopJarStep': {
"Jar": "command-runner.jar",
"Args": ["sh","/home/hadoop/run_test.sh"],
}
}
]
JOB_FLOW_OVERRIDES = {
"Name": "Hell ETL",
"LogUri": "s3://aws-logs-559293306438-us-west-1/elasticmapreduce/",
"ReleaseLabel": "emr-6.2.0",
"Applications": [
{
"Name": "Spark"
},
],
"Instances": {
"Ec2SubnetId": "subnet-0d626501d9db34925",
"InstanceGroups": [
{
"Name": "Master nodes",
"Market": "ON_DEMAND",
"InstanceRole": "MASTER",
"InstanceType": "m5.xlarge",
"InstanceCount": 1,
}
],
"KeepJobFlowAliveWhenNoSteps": False,
"TerminationProtected": False,
},
"BootstrapActions": [
{
"Name": "Install Dependencies",
"ScriptBootstrapAction": {
"Path": "s3://"+path+"-pyspark/hell/install_python_modules.sh",
}
}
],
}
with DAG(
dag_id=DAG_ID,
description='Run built-in Spark app on Amazon EMR',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
start_date=pendulum.datetime(2022, 4, 19,tz=local_tz),
schedule_interval='05 7 * * *',
tags=['emr-dev'],
) as dag:
get_aws_secret=[PythonOperator(task_id='get_aws_fields',python_callable=get_secret)]
cluster_creator = EmrCreateJobFlowOperator(ti,task_id='create_job_flow',job_flow_overrides=JOB_FLOW_OVERRIDES)
step_adder = EmrAddStepsOperator(task_id='add_steps',job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',steps=SPARK_STEPS, )
step_checker = EmrStepSensor(task_id='watch_step',job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",aws_conn_id='aws_default',)
get_aws_secret>>cluster_creator >> step_adder >> step_checker
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
![扫码二维码加入Web技术交流群](/public/img/jiaqun_03.jpg)
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
{{task_instance}}
在您的spark_steps
中不包含在字符串中。应该看起来像
path
。在
's3://'/+path+'-pyspark/hell/main.py'
中,也The
{{ task_instance }}
in yourSPARK_STEPS
is not contained in a string.Should be
Also looks like
path
in's3://'+path+'-pyspark/hell/main.py'
isn't defined in the DAG either.