气流emraddstepsoperator无法执行火花阴影罐
Spark App的步骤类型应该是什么。.我面临的问题是,主类型未设置或无法识别纱线..似乎将应用程序视为简单的jar而不是Spark提交模式。 dag,error和emr屏幕截图
Amazon Emr Cloud Console手动添加Spark作业,作为步骤
=“ https://i.sstatic.net/oeflx.png” rel =“ nofollow noreferrer”>在添加spark jar键入步骤而不是自定义jar步骤之后。给出spark spark sump args and args and main method args args args agr
我的错误消息:
> Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:385)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2574)
at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:934)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:928)
at com.mcf.ExtractcustomerCategoryWiseSummarizedViews$.main(ExtractcustomerCategoryWiseSummarizedViews.scala:13)
at com.mcf.ExtractcustomerCategoryWiseSummarizedViews.main(ExtractcustomerCategoryWiseSummarizedViews.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
at org.apache.hadoop.util.RunJar.main(RunJar.java:236)
这是一个AWS EMR管道的示例DAG。
首先创建集群,添加步骤/操作,检查步骤以及完成后完成 终止集群。
import time
from airflow.operators.python import PythonOperator
from datetime import timedelta
from airflow import DAG
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
from airflow.utils.dates import days_ago
SPARK_STEPS = [
{
'Name': 'PerformETL',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',
#'MainClass': 'com.sadim.main',
'Args': ['spark-submit',
'--deploy-mode',
'cluster',
'--master',
'yarn',
'--class',
'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',
'--mode',
'DeltaLoadByDays',
'--noOfDaysBehindTodayForDeltaLoad',
'1',
'--s3InputPath',
's3://data-lake/documents/accountscore/categoriseddata/',
'--s3OutputPathcustomerCategoryWiseSummarizedViews',
's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],
},
}
]
SPARK_STEPS2 = [
{
'Name': 'sadim_test3',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://test-data/jars/scalatestnadeem-0.0.1-SNAPSHOT_v2.jar',
'MainClass': 'com.sadim.scalatestnadeem.Test',
'Args': ['spark-submit',
'--deploy-mode',
'client',
'--master',
'yarn',
'--conf',
'spark.yarn.submit.waitAppCompletion=true'],
},
}
]
SPARK_STEPS3 = [
{
'Name': 'sadim_test3',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT_masteryarnwithoutdependencyandtest.jar',
'MainClass': 'com.sadim.TestSadim',
'Args': ['spark-submit',
'--deploy-mode',
'client',
'--master',
'yarn',
'--conf',
'spark.yarn.submit.waitAppCompletion=true'],
},
}
]
SPARK_STEPS4 = [
{
'Name': 'PerformETL',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',
#'MainClass': 'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',
'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',
'spark-submit',
'--deploy-mode',
'client',
'--master',
'yarn',
'--mode',
'DeltaLoadByDays',
'--noOfDaysBehindTodayForDeltaLoad',
'1',
'--s3InputPath',
's3://data-lake/documents/accountscore/categoriseddata/',
'--s3OutputPathcustomerCategoryWiseSummarizedViews',
's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],
},
}
]
SPARK_STEPS5 = [
{
'Name': 'PerformETL',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',
#'MainClass': 'com.sadim.main',
'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',
'--mode',
'DeltaLoadByDays',
'--noOfDaysBehindTodayForDeltaLoad',
'1',
'--s3InputPath',
's3://data-lake/documents/accountscore/categoriseddata/',
'--s3OutputPathcustomerCategoryWiseSummarizedViews',
's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],
},
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'ob_emr_airflow_automation',
'ReleaseLabel': 'emr-6.6.0',
'LogUri': 's3://test-data/emr_logs/',
'Instances': {
'InstanceGroups': [
{
'Name': 'Master node',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1
},
{
'Name': "Slave nodes",
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1
}
],
'Ec2SubnetId': 'subnet-03129248888a14196',
'Ec2KeyName': 'datalake-emr-nodes',
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False
},
'BootstrapActions': [
{
'Name': 'Java11InstallBootstrap',
'ScriptBootstrapAction': {
'Path': 's3://test-data/jars/bootstrap.sh',
'Args': [
]
}
}
],
'Configurations': [
{
"Classification":"spark-defaults",
"Properties":{
"spark.driver.defaultJavaOptions":"-XX:OnOutOfMemoryError='kill -9 %p' - XX:MaxHeapFreeRatio=70",
"spark.executor.defaultJavaOptions":"-verbose:gc -Xlog:gc*::time - XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p' -XX:MaxHeapFreeRatio=70 -XX:+IgnoreUnrecognizedVMOptions"
}
}
],
'JobFlowRole': 'DL_EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
}
with DAG(
dag_id='emr_job_flow_manual_steps_dag_v6',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
},
dagrun_timeout=timedelta(hours=1),
start_date=days_ago(1),
schedule_interval='0 3 * * *',
tags=['example'],
) as dag:
# [START howto_operator_emr_manual_steps_tasks]
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
)
delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
dag=dag,
python_callable=lambda: time.sleep(400))
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id=cluster_creator.output,
aws_conn_id='aws_default',
steps=SPARK_STEPS5,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id=cluster_creator.output,
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id=cluster_creator.output,
aws_conn_id='aws_default',
)
cluster_creator >> step_adder >> step_checker >> cluster_remover
# [END howto_operator_emr_manual_steps_tasks]
# Task dependencies created via `XComArgs`:
# cluster_creator >> step_checker
# cluster_creator >> cluster_remover
what should be in step type for spark app .. I am facing issue that master type not set or unable to recognize yarn .. seems it is considering the application as simple jar rather than spark submit mode when using emrAddStepsOperator .. please find attached airflow dag , error and emr screenshot
amazon emr cloud console manually adding spark job as a step
step type can be streaming or spark app or custom jar
My Error Message:
> Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:385)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2574)
at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:934)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:928)
at com.mcf.ExtractcustomerCategoryWiseSummarizedViews$.main(ExtractcustomerCategoryWiseSummarizedViews.scala:13)
at com.mcf.ExtractcustomerCategoryWiseSummarizedViews.main(ExtractcustomerCategoryWiseSummarizedViews.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
at org.apache.hadoop.util.RunJar.main(RunJar.java:236)
This is an example dag for a AWS EMR Pipeline.
Starting by creating a cluster, adding steps/operations, checking steps and finally when finished
terminating the cluster.
import time
from airflow.operators.python import PythonOperator
from datetime import timedelta
from airflow import DAG
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
from airflow.utils.dates import days_ago
SPARK_STEPS = [
{
'Name': 'PerformETL',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',
#'MainClass': 'com.sadim.main',
'Args': ['spark-submit',
'--deploy-mode',
'cluster',
'--master',
'yarn',
'--class',
'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',
'--mode',
'DeltaLoadByDays',
'--noOfDaysBehindTodayForDeltaLoad',
'1',
'--s3InputPath',
's3://data-lake/documents/accountscore/categoriseddata/',
'--s3OutputPathcustomerCategoryWiseSummarizedViews',
's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],
},
}
]
SPARK_STEPS2 = [
{
'Name': 'sadim_test3',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://test-data/jars/scalatestnadeem-0.0.1-SNAPSHOT_v2.jar',
'MainClass': 'com.sadim.scalatestnadeem.Test',
'Args': ['spark-submit',
'--deploy-mode',
'client',
'--master',
'yarn',
'--conf',
'spark.yarn.submit.waitAppCompletion=true'],
},
}
]
SPARK_STEPS3 = [
{
'Name': 'sadim_test3',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT_masteryarnwithoutdependencyandtest.jar',
'MainClass': 'com.sadim.TestSadim',
'Args': ['spark-submit',
'--deploy-mode',
'client',
'--master',
'yarn',
'--conf',
'spark.yarn.submit.waitAppCompletion=true'],
},
}
]
SPARK_STEPS4 = [
{
'Name': 'PerformETL',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',
#'MainClass': 'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',
'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',
'spark-submit',
'--deploy-mode',
'client',
'--master',
'yarn',
'--mode',
'DeltaLoadByDays',
'--noOfDaysBehindTodayForDeltaLoad',
'1',
'--s3InputPath',
's3://data-lake/documents/accountscore/categoriseddata/',
'--s3OutputPathcustomerCategoryWiseSummarizedViews',
's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],
},
}
]
SPARK_STEPS5 = [
{
'Name': 'PerformETL',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',
#'MainClass': 'com.sadim.main',
'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',
'--mode',
'DeltaLoadByDays',
'--noOfDaysBehindTodayForDeltaLoad',
'1',
'--s3InputPath',
's3://data-lake/documents/accountscore/categoriseddata/',
'--s3OutputPathcustomerCategoryWiseSummarizedViews',
's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],
},
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'ob_emr_airflow_automation',
'ReleaseLabel': 'emr-6.6.0',
'LogUri': 's3://test-data/emr_logs/',
'Instances': {
'InstanceGroups': [
{
'Name': 'Master node',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1
},
{
'Name': "Slave nodes",
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1
}
],
'Ec2SubnetId': 'subnet-03129248888a14196',
'Ec2KeyName': 'datalake-emr-nodes',
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False
},
'BootstrapActions': [
{
'Name': 'Java11InstallBootstrap',
'ScriptBootstrapAction': {
'Path': 's3://test-data/jars/bootstrap.sh',
'Args': [
]
}
}
],
'Configurations': [
{
"Classification":"spark-defaults",
"Properties":{
"spark.driver.defaultJavaOptions":"-XX:OnOutOfMemoryError='kill -9 %p' - XX:MaxHeapFreeRatio=70",
"spark.executor.defaultJavaOptions":"-verbose:gc -Xlog:gc*::time - XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p' -XX:MaxHeapFreeRatio=70 -XX:+IgnoreUnrecognizedVMOptions"
}
}
],
'JobFlowRole': 'DL_EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
}
with DAG(
dag_id='emr_job_flow_manual_steps_dag_v6',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
},
dagrun_timeout=timedelta(hours=1),
start_date=days_ago(1),
schedule_interval='0 3 * * *',
tags=['example'],
) as dag:
# [START howto_operator_emr_manual_steps_tasks]
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
)
delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
dag=dag,
python_callable=lambda: time.sleep(400))
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id=cluster_creator.output,
aws_conn_id='aws_default',
steps=SPARK_STEPS5,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id=cluster_creator.output,
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id=cluster_creator.output,
aws_conn_id='aws_default',
)
cluster_creator >> step_adder >> step_checker >> cluster_remover
# [END howto_operator_emr_manual_steps_tasks]
# Task dependencies created via `XComArgs`:
# cluster_creator >> step_checker
# cluster_creator >> cluster_remover
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
问题解决了。我们需要将命令运行罐作为emraddstepsoperator中的jar选项
The issue got fixed. We need to use the command-runner jar as jar option in emrAddStepsOperator and pass your specific ETL job jar inside args as