如何从Python函数中检索数据并将其用于EMR操作员
气流版本:2.0.2 试图通过从AWS Secrets Manager中重试数据来创建EMR群集。
我正在尝试编写气流DAG,我的任务是从此get_secret函数中获取数据,并在spark_steps中使用它
def get_secret():
secret_name = Variable.get("secret_name")
region_name = Variable.get(region_name)
# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(service_name='secretsmanager', region_name=region_name)
account_id = boto3.client('sts').get_caller_identity().get('Account')
try:
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
if 'SecretString' in get_secret_value_response:
secret_str = get_secret_value_response['SecretString']
secret=json.loads(secret_str)
airflow_path=secret["airflow_path"]
return airflow_path
... 我需要在spark_steps下方使用“ airflow_path”返回值 spark_steps:
SPARK_STEPS = [
{
'Name': 'Spark-Submit Command',
"ActionOnFailure": "CONTINUE",
'HadoopJarStep': {
"Jar": "command-runner.jar",
"Args": [
'spark-submit',
'--py-files',
's3://'+airflow_path+'-pyspark/pitchbook/config.zip,s3://'+airflow_path+'-pyspark/pitchbook/jobs.zip,s3://'+airflow_path+'-pyspark/pitchbook/DDL.zip',
's3://'+airflow_path+'-pyspark/pitchbook/main.py'
],
},
},
我在互联网上看到了我需要使用XCOM,这是对的吗?我是新手,请提供一个例子。
感谢您的帮助。 xi
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
![扫码二维码加入Web技术交流群](/public/img/jiaqun_03.jpg)
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
是的,如果您想传递动态的东西,则利用XCOM推/拉可能会更容易。
杠杆 pythonoperator 将数据推入XCOM。
请参阅参考实现:
https:/ /GITHUB.com/apache/airflow/blob/7fed7f31c3a895c0df08228541f955555555355EFB16FBF79/AIRFLOWERFLOWEN/AIRFLOWER/PROFOVIDERS/AMAZON/AMAZON/AMAZON/AMAZON/AMAZON/AMAZON/AMAZON/AMAZON/AMAZON/AMAZON/EXAMP/EXAMPL._EMR._EMR._EMR._EMR.PY #pypample.py
#l108 哦-to-submit-spark-jobs-to-emr-cluster-from-airflow/“ rel =“ nofollow noreferrer”> https://www.startdataengineering.com/post/post/how-to-to-submit-submit-spark-jobs-jobs-jobs-to- -emr-cluster-from-airFlow/
Yes if you would like to pass dynamic stuff, leveraging xcom push/pull might be easier.
Leverage PythonOperator to push data into xcom.
See reference implementation:
https://github.com/apache/airflow/blob/7fed7f31c3a895c0df08228541f955efb16fbf79/airflow/providers/amazon/aws/example_dags/example_emr.py
https://github.com/apache/airflow/blob/7fed7f31c3a895c0df08228541f955efb16fbf79/airflow/providers/amazon/aws/example_dags/example_emr.py#L108
https://www.startdataengineering.com/post/how-to-submit-spark-jobs-to-emr-cluster-from-airflow/