EMR - Airflow 运行 scala jar 文件airflow.exceptions.AirflowException

发布于 2025-01-19 23:44:31 字数 3308 浏览 2 评论 0原文

我正在尝试使用 emr 从 AIRFLOW 运行 scala jar 文件,该 jar 文件旨在读取 mssql-jdbc 和 postgresql。

通过气流,我可以创建集群 在此处输入图像描述

我的 SPARK_STEPS 看起来像

SPARK_STEPS = [
    {
        'Name': 'Trigger_Source_Target',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit', 
                     '--master', 'yarn', 
                     '--jars', '/mnt/MyScalaImport.jar', 
                     '--class', 'org.classname', 
                     's3://path/SNAPSHOT.jar', 
                     'SQL_Pwd', 'PostgreSQL_PWD', 'loadtype'],
        }
    }
]

在此之后我定义了 JOB_FLOW_OVERRIDES -

JOB_FLOW_OVERRIDES = {
    "Name": "pfdt-cluster-airflow",
    "LogUri": "s3://path/elasticmapreduce/",
    "ReleaseLabel": "emr-6.4.0",
    "Applications": [
        {"Name": "Spark"},
    ],
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Master nodes",
                "Market": "ON_DEMAND",
                "InstanceRole": "MASTER",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 1,
            }
        ],
        "KeepJobFlowAliveWhenNoSteps": True,
        "TerminationProtected": False,
        'Ec2KeyName': 'pem_file_name',
        "Ec2SubnetId": "subnet-123"
    },
    'BootstrapActions': [
        {
            'Name': 'import custom Jars',
            'ScriptBootstrapAction': {
                'Path': 's3://path/subpath/copytoolsjar.sh',
                'Args': []
            }
        }
    ],
    'Configurations': [
      {
        'Classification': 'spark-defaults',
        'Properties': {
          'spark.jars': 's3://jar_path/mssql-jdbc-8.4.1.jre8.jar'
         }
      }
    ],
    "VisibleToAllUsers": True,
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "ServiceRole": "EMR_DefaultRole",
    "Tags": [
        {"Key": "Environment", "Value": "Development"},
    ],
}

将 scala .jar 文件从 S3 复制到本地气流-我有一个可以完成工作的shell脚本:Path- s3://path/subpath/copytoolsjar.sh

aws s3 cp s3://path/SNAPSHOT.jar /mnt/MyScalaImport.jar

在触发气流时 - 它在节点 watch_step

我收到的错误是- stdout.gz => 输入图片此处描述stderr.gz =>

22/04/08 13:38:23 INFO CodeGenerator:在 25.5907 毫秒内生成代码 线程“主”java.sql.SQLException 中出现异常:没有合适的驱动程序 在 java.sql.DriverManager.getDriver(DriverManager.java:315) 在 org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:108) 在 scala.Option.getOrElse(Option.scala:189) 在org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions。(JDBCOptions.scala:108) 在 org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.(JDBCOptions.scala:38)

如何解决此问题 - 我的罐子在-

s3://路径/子路径/mssql-jdbc-8.4.1.jre8.jar

s3://路径/子路径/postgresql-42.2.24.jar

I am trying to run a scala jar file from AIRFLOW using emr and the jar file is designed to read mssql-jdbc and postgresql.

From airflow, I'm able to create cluster enter image description here

My SPARK_STEPS looks like

SPARK_STEPS = [
    {
        'Name': 'Trigger_Source_Target',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit', 
                     '--master', 'yarn', 
                     '--jars', '/mnt/MyScalaImport.jar', 
                     '--class', 'org.classname', 
                     's3://path/SNAPSHOT.jar', 
                     'SQL_Pwd', 'PostgreSQL_PWD', 'loadtype'],
        }
    }
]

After this I have JOB_FLOW_OVERRIDES defined-

JOB_FLOW_OVERRIDES = {
    "Name": "pfdt-cluster-airflow",
    "LogUri": "s3://path/elasticmapreduce/",
    "ReleaseLabel": "emr-6.4.0",
    "Applications": [
        {"Name": "Spark"},
    ],
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Master nodes",
                "Market": "ON_DEMAND",
                "InstanceRole": "MASTER",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 1,
            }
        ],
        "KeepJobFlowAliveWhenNoSteps": True,
        "TerminationProtected": False,
        'Ec2KeyName': 'pem_file_name',
        "Ec2SubnetId": "subnet-123"
    },
    'BootstrapActions': [
        {
            'Name': 'import custom Jars',
            'ScriptBootstrapAction': {
                'Path': 's3://path/subpath/copytoolsjar.sh',
                'Args': []
            }
        }
    ],
    'Configurations': [
      {
        'Classification': 'spark-defaults',
        'Properties': {
          'spark.jars': 's3://jar_path/mssql-jdbc-8.4.1.jre8.jar'
         }
      }
    ],
    "VisibleToAllUsers": True,
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "ServiceRole": "EMR_DefaultRole",
    "Tags": [
        {"Key": "Environment", "Value": "Development"},
    ],
}

To copy the scala .jar file from S3 to local to airflow- I have a shell script which does the work: Path- s3://path/subpath/copytoolsjar.sh

aws s3 cp s3://path/SNAPSHOT.jar /mnt/MyScalaImport.jar

On triggering the airflow-
It fails at node watch_step enter image description here

Errors what I'm getting are-
stdout.gz => enter image description here
stderr.gz =>

22/04/08 13:38:23 INFO CodeGenerator: Code generated in 25.5907 ms
Exception in thread "main" java.sql.SQLException: No suitable driver
at java.sql.DriverManager.getDriver(DriverManager.java:315)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:108)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.(JDBCOptions.scala:108)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.(JDBCOptions.scala:38)

How to resolve this issue-
I have my jars at-

s3://path/subpath/mssql-jdbc-8.4.1.jre8.jar

s3://path/subpath/postgresql-42.2.24.jar

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

雨落□心尘 2025-01-26 23:44:31

要上传JAR文件(MSSQL-JDBC-8.4.1.JRE8.JAR,POSTGRESQL-42.2.24.JAR)到

Bootstrap step-
'bootstrapactions':[{'name':'import custom jars','scriptbootStrapaction':{'path':'s3:'s3://path/subpath/copytoolsjar.sh'','args':[]}}}} ]

在copytoolsjar.sh文件中写下命令as-
AWS S3 CP CP S3://path/snapshot.jar/mnt/myscalaimport.jar&&& bash -c“ sudo aws s3 cp s3://path/subpath/mssql-jdbc-8.4.1.jre8.jre8.jar/iusr/iusr/ib/lib/park/aspar/jars/”&&& bash -c“ sudo aws s3 cp s3://path/subpath/postgresql-42.2.24.2.24.jar/usr/lib/lib/spark/jars/'
工作将完成

To upload jar files(mssql-jdbc-8.4.1.jre8.jar, postgresql-42.2.24.jar) to airflow local-

In the bootstrap step-
'BootstrapActions': [ { 'Name': 'import custom Jars', 'ScriptBootstrapAction': { 'Path': 's3://path/subpath/copytoolsjar.sh', 'Args': [] } } ]

In copytoolsjar.sh file write the command as-
aws s3 cp cp s3://path/SNAPSHOT.jar /mnt/MyScalaImport.jar && bash -c "sudo aws s3 cp s3://path/subpath/mssql-jdbc-8.4.1.jre8.jar /usr/lib/spark/jars/" && bash -c "sudo aws s3 cp s3://path/subpath/postgresql-42.2.24.jar /usr/lib/spark/jars/"
Work will be done

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文