如何从Python函数中检索数据并将其用于EMR操作员

发布于 2025-02-06 13:26:39 字数 1537 浏览 0 评论 0 原文

气流版本:2.0.2 试图通过从AWS Secrets Manager中重试数据来创建EMR群集。

我正在尝试编写气流DAG,我的任务是从此get_secret函数中获取数据,并在spark_steps中使用它

def get_secret():
    secret_name =  Variable.get("secret_name")
    region_name = Variable.get(region_name)
    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(service_name='secretsmanager', region_name=region_name)
    account_id = boto3.client('sts').get_caller_identity().get('Account')
    
    try:
        get_secret_value_response = client.get_secret_value(SecretId=secret_name)
        if 'SecretString' in get_secret_value_response:
            secret_str = get_secret_value_response['SecretString']
            secret=json.loads(secret_str)            
            airflow_path=secret["airflow_path"]
            return  airflow_path 

... 我需要在spark_steps下方使用“ airflow_path”返回值 spark_steps:

SPARK_STEPS = [

    {
        'Name': 'Spark-Submit Command',
        "ActionOnFailure": "CONTINUE",
        'HadoopJarStep': {
            "Jar": "command-runner.jar",
            "Args": [
                'spark-submit',
                '--py-files',
                's3://'+airflow_path+'-pyspark/pitchbook/config.zip,s3://'+airflow_path+'-pyspark/pitchbook/jobs.zip,s3://'+airflow_path+'-pyspark/pitchbook/DDL.zip',
                's3://'+airflow_path+'-pyspark/pitchbook/main.py'
                    ],
                        },
    },

我在互联网上看到了我需要使用XCOM,这是对的吗?我是新手,请提供一个例子。

感谢您的帮助。 xi

Airflow version :2.0.2
Trying to create Emr cluster, by retrying data from AWS secrets manager.

I am trying to write an airflow dag and, my task is to get data from this get_secret function and use it in Spark_steps

def get_secret():
    secret_name =  Variable.get("secret_name")
    region_name = Variable.get(region_name)
    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(service_name='secretsmanager', region_name=region_name)
    account_id = boto3.client('sts').get_caller_identity().get('Account')
    
    try:
        get_secret_value_response = client.get_secret_value(SecretId=secret_name)
        if 'SecretString' in get_secret_value_response:
            secret_str = get_secret_value_response['SecretString']
            secret=json.loads(secret_str)            
            airflow_path=secret["airflow_path"]
            return  airflow_path 

...
I need to use "airflow_path" return value in below spark_steps
Spark_steps:

SPARK_STEPS = [

    {
        'Name': 'Spark-Submit Command',
        "ActionOnFailure": "CONTINUE",
        'HadoopJarStep': {
            "Jar": "command-runner.jar",
            "Args": [
                'spark-submit',
                '--py-files',
                's3://'+airflow_path+'-pyspark/pitchbook/config.zip,s3://'+airflow_path+'-pyspark/pitchbook/jobs.zip,s3://'+airflow_path+'-pyspark/pitchbook/DDL.zip',
                's3://'+airflow_path+'-pyspark/pitchbook/main.py'
                    ],
                        },
    },

I saw on the internet I need to use Xcom, is this right ?, and do I need to run this function in python operator first and then get the value. please provide an example as I am a newbie.

Thanks for your help.
Xi

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

流殇 2025-02-13 13:26:39

是的,如果您想传递动态的东西,则利用XCOM推/拉可能会更容易。

杠杆 pythonoperator 将数据推入XCOM。

请参阅参考实现:



https:/ /GITHUB.com/apache/airflow/blob/7fed7f31c3a895c0df08228541f955555555355EFB16FBF79/AIRFLOWERFLOWEN/AIRFLOWER/PROFOVIDERS/AMAZON/AMAZON/AMAZON/AMAZON/AMAZON/AMAZON/AMAZON/AMAZON/AMAZON/AMAZON/EXAMP/EXAMPL._EMR._EMR._EMR._EMR.PY #pypample.py

#l108 哦-to-submit-spark-jobs-to-emr-cluster-from-airflow/“ rel =“ nofollow noreferrer”> https://www.startdataengineering.com/post/post/how-to-to-submit-submit-spark-jobs-jobs-jobs-to- -emr-cluster-from-airFlow/

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文