气流emraddstepsoperator无法执行火花阴影罐

发布于 2025-02-05 09:22:31 字数 10276 浏览 2 评论 0原文

Spark App的步骤类型应该是什么。.我面临的问题是,主类型未设置或无法识别纱线..似乎将应用程序视为简单的jar而不是Spark提交模式。 dag,error和emr屏幕截图

Amazon Emr Cloud Console手动添加Spark作业,作为步骤

=“ https://i.sstatic.net/oeflx.png” rel =“ nofollow noreferrer”>在添加spark jar键入步骤而不是自定义jar步骤之后。给出spark spark sump args and args and main method args args args agr

步骤类型可以流式传输或spark应用程序或自定义jar

我的错误消息:

> Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:385)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2574)
    at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:934)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:928)
    at com.mcf.ExtractcustomerCategoryWiseSummarizedViews$.main(ExtractcustomerCategoryWiseSummarizedViews.scala:13)
    at com.mcf.ExtractcustomerCategoryWiseSummarizedViews.main(ExtractcustomerCategoryWiseSummarizedViews.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:236)

这是一个AWS EMR管道的示例DAG。

首先创建集群,添加步骤/操作,检查步骤以及完成后完成 终止集群。

import time    
from airflow.operators.python import PythonOperator     
from datetime import timedelta    

from airflow import DAG    
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator    
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator    
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator     
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor    
from airflow.utils.dates import days_ago    

SPARK_STEPS = [    
    {    
        'Name': 'PerformETL',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',    
            #'MainClass': 'com.sadim.main',    
            'Args': ['spark-submit',    
                    '--deploy-mode',    
                    'cluster',    
                    '--master',    
                    'yarn',    
                    '--class',    
                    'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',     
                    '--mode',    
                'DeltaLoadByDays',    
                '--noOfDaysBehindTodayForDeltaLoad',    
                '1',    
                '--s3InputPath',    
                's3://data-lake/documents/accountscore/categoriseddata/',    
                '--s3OutputPathcustomerCategoryWiseSummarizedViews',    
                's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],    
        },
    }
]
SPARK_STEPS2 = [
    {
        'Name': 'sadim_test3',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 's3://test-data/jars/scalatestnadeem-0.0.1-SNAPSHOT_v2.jar',
            'MainClass': 'com.sadim.scalatestnadeem.Test',
            'Args': ['spark-submit',    
                '--deploy-mode',    
                'client',    
                '--master',    
                'yarn',    
                '--conf',    
                'spark.yarn.submit.waitAppCompletion=true'],    
        },    
    }    
]    
SPARK_STEPS3 = [    
    {    
        'Name': 'sadim_test3',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {    
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT_masteryarnwithoutdependencyandtest.jar',    
            'MainClass': 'com.sadim.TestSadim',    
            'Args': ['spark-submit',     
                '--deploy-mode',    
                'client',     
                '--master',     
                'yarn',    
                '--conf',    
                'spark.yarn.submit.waitAppCompletion=true'],    
        },    
    }    
]    
SPARK_STEPS4 = [    
    {    
        'Name': 'PerformETL',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {    
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',    
            #'MainClass': 'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',    
            'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',    
                'spark-submit',    
                '--deploy-mode',    
                'client',    
                 '--master',    
                 'yarn',                    
                    '--mode',    
                'DeltaLoadByDays',    
                '--noOfDaysBehindTodayForDeltaLoad',    
                '1',    
                '--s3InputPath',    
                's3://data-lake/documents/accountscore/categoriseddata/',    
                '--s3OutputPathcustomerCategoryWiseSummarizedViews',    
                's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],    
        },    
    }    
]    
SPARK_STEPS5 = [    
    {    
        'Name': 'PerformETL',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {    
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',    
            #'MainClass': 'com.sadim.main',    
            'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',    
                    '--mode',    
                'DeltaLoadByDays',    
                '--noOfDaysBehindTodayForDeltaLoad',    
                '1',    
                '--s3InputPath',    
                's3://data-lake/documents/accountscore/categoriseddata/',    
                '--s3OutputPathcustomerCategoryWiseSummarizedViews',    
                's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],    
        },    
    }    
]    
JOB_FLOW_OVERRIDES = {    
    'Name': 'ob_emr_airflow_automation',    
    'ReleaseLabel': 'emr-6.6.0',    
    'LogUri': 's3://test-data/emr_logs/',    
    'Instances': {    
        'InstanceGroups': [    
            {    
                'Name': 'Master node',    
                'Market': 'ON_DEMAND',    
                'InstanceRole': 'MASTER',    
                'InstanceType': 'm5.xlarge',    
                'InstanceCount': 1    
            },    
            {    
                    'Name': "Slave nodes",    
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'CORE',    
                    'InstanceType': 'm5.xlarge',    
                    'InstanceCount': 1    
            }    
        ],    
        'Ec2SubnetId': 'subnet-03129248888a14196',    
        'Ec2KeyName': 'datalake-emr-nodes',    
        'KeepJobFlowAliveWhenNoSteps': True,    
        'TerminationProtected': False    
    },    
    'BootstrapActions': [    
                {    
                        'Name': 'Java11InstallBootstrap',    
                        'ScriptBootstrapAction': {    
                            'Path': 's3://test-data/jars/bootstrap.sh',    
                            'Args': [    
                            ]    
                        }    
                }    
    ],    
    'Configurations': [    
        {    
            "Classification":"spark-defaults",    
                    "Properties":{    
                    "spark.driver.defaultJavaOptions":"-XX:OnOutOfMemoryError='kill -9 %p' -    XX:MaxHeapFreeRatio=70",    
                    "spark.executor.defaultJavaOptions":"-verbose:gc -Xlog:gc*::time -    XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p' -XX:MaxHeapFreeRatio=70 -XX:+IgnoreUnrecognizedVMOptions"
                                }    
        }    
    ],    
    'JobFlowRole': 'DL_EMR_EC2_DefaultRole',    
    'ServiceRole': 'EMR_DefaultRole',    
}    

with DAG(    
    dag_id='emr_job_flow_manual_steps_dag_v6',    
    default_args={    
        'owner': 'airflow',    
        'depends_on_past': False,    
        'email': ['[email protected]'],    
        'email_on_failure': False,    
        'email_on_retry': False,    
    },    
    dagrun_timeout=timedelta(hours=1),    
    start_date=days_ago(1),
    schedule_interval='0 3 * * *',
    tags=['example'],
) as dag:

    # [START howto_operator_emr_manual_steps_tasks]
    cluster_creator = EmrCreateJobFlowOperator(
        task_id='create_job_flow',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
    )

    delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
                                                   dag=dag,
                                                   python_callable=lambda: time.sleep(400))

    step_adder = EmrAddStepsOperator(
        task_id='add_steps',
        job_flow_id=cluster_creator.output,
        aws_conn_id='aws_default',
        steps=SPARK_STEPS5,
    )

    step_checker = EmrStepSensor(
        task_id='watch_step',
        job_flow_id=cluster_creator.output,
        step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
       aws_conn_id='aws_default',
    )

    cluster_remover = EmrTerminateJobFlowOperator(
        task_id='remove_cluster',
        job_flow_id=cluster_creator.output,
        aws_conn_id='aws_default',
    )

    cluster_creator >> step_adder >> step_checker >> cluster_remover

    # [END howto_operator_emr_manual_steps_tasks]

    # Task dependencies created via `XComArgs`:
    #   cluster_creator >> step_checker
    #   cluster_creator >> cluster_remover
    

what should be in step type for spark app .. I am facing issue that master type not set or unable to recognize yarn .. seems it is considering the application as simple jar rather than spark submit mode when using emrAddStepsOperator .. please find attached airflow dag , error and emr screenshot

amazon emr cloud console manually adding spark job as a step

After adding spark jar type step rather than custom jar step .. gives option to give spark submit args and main method args separately

step type can be streaming or spark app or custom jar

My Error Message:

> Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:385)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2574)
    at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:934)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:928)
    at com.mcf.ExtractcustomerCategoryWiseSummarizedViews$.main(ExtractcustomerCategoryWiseSummarizedViews.scala:13)
    at com.mcf.ExtractcustomerCategoryWiseSummarizedViews.main(ExtractcustomerCategoryWiseSummarizedViews.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:236)

This is an example dag for a AWS EMR Pipeline.

Starting by creating a cluster, adding steps/operations, checking steps and finally when finished
terminating the cluster.

import time    
from airflow.operators.python import PythonOperator     
from datetime import timedelta    

from airflow import DAG    
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator    
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator    
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator     
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor    
from airflow.utils.dates import days_ago    

SPARK_STEPS = [    
    {    
        'Name': 'PerformETL',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',    
            #'MainClass': 'com.sadim.main',    
            'Args': ['spark-submit',    
                    '--deploy-mode',    
                    'cluster',    
                    '--master',    
                    'yarn',    
                    '--class',    
                    'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',     
                    '--mode',    
                'DeltaLoadByDays',    
                '--noOfDaysBehindTodayForDeltaLoad',    
                '1',    
                '--s3InputPath',    
                's3://data-lake/documents/accountscore/categoriseddata/',    
                '--s3OutputPathcustomerCategoryWiseSummarizedViews',    
                's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],    
        },
    }
]
SPARK_STEPS2 = [
    {
        'Name': 'sadim_test3',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 's3://test-data/jars/scalatestnadeem-0.0.1-SNAPSHOT_v2.jar',
            'MainClass': 'com.sadim.scalatestnadeem.Test',
            'Args': ['spark-submit',    
                '--deploy-mode',    
                'client',    
                '--master',    
                'yarn',    
                '--conf',    
                'spark.yarn.submit.waitAppCompletion=true'],    
        },    
    }    
]    
SPARK_STEPS3 = [    
    {    
        'Name': 'sadim_test3',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {    
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT_masteryarnwithoutdependencyandtest.jar',    
            'MainClass': 'com.sadim.TestSadim',    
            'Args': ['spark-submit',     
                '--deploy-mode',    
                'client',     
                '--master',     
                'yarn',    
                '--conf',    
                'spark.yarn.submit.waitAppCompletion=true'],    
        },    
    }    
]    
SPARK_STEPS4 = [    
    {    
        'Name': 'PerformETL',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {    
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',    
            #'MainClass': 'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',    
            'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',    
                'spark-submit',    
                '--deploy-mode',    
                'client',    
                 '--master',    
                 'yarn',                    
                    '--mode',    
                'DeltaLoadByDays',    
                '--noOfDaysBehindTodayForDeltaLoad',    
                '1',    
                '--s3InputPath',    
                's3://data-lake/documents/accountscore/categoriseddata/',    
                '--s3OutputPathcustomerCategoryWiseSummarizedViews',    
                's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],    
        },    
    }    
]    
SPARK_STEPS5 = [    
    {    
        'Name': 'PerformETL',    
        'ActionOnFailure': 'CONTINUE',    
        'HadoopJarStep': {    
            'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',    
            #'MainClass': 'com.sadim.main',    
            'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',    
                    '--mode',    
                'DeltaLoadByDays',    
                '--noOfDaysBehindTodayForDeltaLoad',    
                '1',    
                '--s3InputPath',    
                's3://data-lake/documents/accountscore/categoriseddata/',    
                '--s3OutputPathcustomerCategoryWiseSummarizedViews',    
                's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],    
        },    
    }    
]    
JOB_FLOW_OVERRIDES = {    
    'Name': 'ob_emr_airflow_automation',    
    'ReleaseLabel': 'emr-6.6.0',    
    'LogUri': 's3://test-data/emr_logs/',    
    'Instances': {    
        'InstanceGroups': [    
            {    
                'Name': 'Master node',    
                'Market': 'ON_DEMAND',    
                'InstanceRole': 'MASTER',    
                'InstanceType': 'm5.xlarge',    
                'InstanceCount': 1    
            },    
            {    
                    'Name': "Slave nodes",    
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'CORE',    
                    'InstanceType': 'm5.xlarge',    
                    'InstanceCount': 1    
            }    
        ],    
        'Ec2SubnetId': 'subnet-03129248888a14196',    
        'Ec2KeyName': 'datalake-emr-nodes',    
        'KeepJobFlowAliveWhenNoSteps': True,    
        'TerminationProtected': False    
    },    
    'BootstrapActions': [    
                {    
                        'Name': 'Java11InstallBootstrap',    
                        'ScriptBootstrapAction': {    
                            'Path': 's3://test-data/jars/bootstrap.sh',    
                            'Args': [    
                            ]    
                        }    
                }    
    ],    
    'Configurations': [    
        {    
            "Classification":"spark-defaults",    
                    "Properties":{    
                    "spark.driver.defaultJavaOptions":"-XX:OnOutOfMemoryError='kill -9 %p' -    XX:MaxHeapFreeRatio=70",    
                    "spark.executor.defaultJavaOptions":"-verbose:gc -Xlog:gc*::time -    XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p' -XX:MaxHeapFreeRatio=70 -XX:+IgnoreUnrecognizedVMOptions"
                                }    
        }    
    ],    
    'JobFlowRole': 'DL_EMR_EC2_DefaultRole',    
    'ServiceRole': 'EMR_DefaultRole',    
}    

with DAG(    
    dag_id='emr_job_flow_manual_steps_dag_v6',    
    default_args={    
        'owner': 'airflow',    
        'depends_on_past': False,    
        'email': ['[email protected]'],    
        'email_on_failure': False,    
        'email_on_retry': False,    
    },    
    dagrun_timeout=timedelta(hours=1),    
    start_date=days_ago(1),
    schedule_interval='0 3 * * *',
    tags=['example'],
) as dag:

    # [START howto_operator_emr_manual_steps_tasks]
    cluster_creator = EmrCreateJobFlowOperator(
        task_id='create_job_flow',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
    )

    delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
                                                   dag=dag,
                                                   python_callable=lambda: time.sleep(400))

    step_adder = EmrAddStepsOperator(
        task_id='add_steps',
        job_flow_id=cluster_creator.output,
        aws_conn_id='aws_default',
        steps=SPARK_STEPS5,
    )

    step_checker = EmrStepSensor(
        task_id='watch_step',
        job_flow_id=cluster_creator.output,
        step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
       aws_conn_id='aws_default',
    )

    cluster_remover = EmrTerminateJobFlowOperator(
        task_id='remove_cluster',
        job_flow_id=cluster_creator.output,
        aws_conn_id='aws_default',
    )

    cluster_creator >> step_adder >> step_checker >> cluster_remover

    # [END howto_operator_emr_manual_steps_tasks]

    # Task dependencies created via `XComArgs`:
    #   cluster_creator >> step_checker
    #   cluster_creator >> cluster_remover
    

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

北方。的韩爷 2025-02-12 09:22:31

问题解决了。我们需要将命令运行罐作为emraddstepsoperator中的jar选项

SPARK_STEPS = [
    {
        'Name': 'DetectAndLoadOrphanTransactions',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            #'MainClass': 'com.sadim.main',
            'Args': [ 
            'spark-submit',
            '--deploy-mode',
                'cluster',
                 '--master',
                 'yarn',
                 '--class',
                    'com.sadim.DetectAndLoadOrphanTransactions',
                 's3://nadeem-test-data/jars/open-0.0.1-SNAPSHOT_yarn_yarndep_maininpom.jar',
                '--Txnspath',
                's3://nadeem-test-data/spark_output//CategorizedTxnsDataset//2022_load_v2',
                '--demographyPath',
                's3://nadeem-test-data/spark_output//customerDemographics//2022_load_v2'
                '--outPath',
                's3://nadeem-test-data/spark_output//orphan_ihub_ids//2022_load_v2'],
        },
    }
]

The issue got fixed. We need to use the command-runner jar as jar option in emrAddStepsOperator and pass your specific ETL job jar inside args as

SPARK_STEPS = [
    {
        'Name': 'DetectAndLoadOrphanTransactions',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            #'MainClass': 'com.sadim.main',
            'Args': [ 
            'spark-submit',
            '--deploy-mode',
                'cluster',
                 '--master',
                 'yarn',
                 '--class',
                    'com.sadim.DetectAndLoadOrphanTransactions',
                 's3://nadeem-test-data/jars/open-0.0.1-SNAPSHOT_yarn_yarndep_maininpom.jar',
                '--Txnspath',
                's3://nadeem-test-data/spark_output//CategorizedTxnsDataset//2022_load_v2',
                '--demographyPath',
                's3://nadeem-test-data/spark_output//customerDemographics//2022_load_v2'
                '--outPath',
                's3://nadeem-test-data/spark_output//orphan_ihub_ids//2022_load_v2'],
        },
    }
]

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文