Apache Airflow 将数据从 BashOperator 传递到 SparkSubmitOperator

发布于 2025-01-19 17:52:49 字数 1518 浏览 1 评论 0原文

我正在尝试登录服务器100.18.10.182,并从Apache Airflow中的.182服务器触发Spark我的Spark提交作业。我已经使用了Bashoperator(Shell脚本将SSH用于100.18.10.182服务器),并且在Spark提交作业中,我使用Sparksubmitoperator作为Bashoperator的下游。 我能够成功地执行Bashoperator,但Spockoperator失败了: 无法执行:spark提交

我认为这是因为我无法通过SSH的会话(of .182服务器)进入下一个Sparksubmitoperator,或者可能是由于与 - JARS或 - 包装有关的其他一些问题,在这里不确定。

我一直在考虑使用XCOM_PUSH将我的bashoperator和XCOM_PULL从SparkSubmitoperator中推入一些数据,但不确定如何以登录服务器的方式传递它,然后我的SparkSubmitoperator从该盒子本身触发?

气流DAG代码:

    t2 = BashOperator(
    task_id='test_bash_operator',
    bash_command="/Users/hardikgoel/Downloads/Work/airflow_dir/shell_files/airflow_prod_ssh_script.sh ",
    dag=dag)
t2

t3_config = {
    'conf': {
        "spark.yarn.maxAppAttempts": "1",
        "spark.yarn.executor.memoryOverhead": "8"
    },
    'conn_id': 'spark_default',
    'packages': 'com.sparkjobs.SparkJobsApplication',
    'jars': '/var/spark/spark-jobs-0.0.1-SNAPSHOT-1/spark-jobs-0.0.1-SNAPSHOT.jar firstJob',
    'driver_memory': '1g',
    'total_executor_cores': '21',
    'executor_cores': 7,
    'executor_memory': '48g'
}


t3 = SparkSubmitOperator(
    task_id='t3',
    **t3_config)

t2 >> t3

外壳脚本代码:

    #!/bin/bash
USERNAME=hardikgoel
HOSTS="100.18.10.182"
SCRIPT="pwd; ls"
ssh -l ${USERNAME} ${HOSTS} "${SCRIPT}"
echo "SSHed successfully"
if [ ${PIPESTATUS[0]} -eq 0 ]; then
  echo "successfull"
fi

I am trying to login into a server 100.18.10.182 and triggering my spark submit job in the server 100.18.10.36 from .182 server in Apache Airflow. I have used BashOperator (a shell script to ssh into 100.18.10.182 server) and for the spark submit job, I have used SparkSubmitOperator as a downstream to BashOperator.
I am able to execute the BashOperator successfully but the SparkOperator fails with:
Cannot execute: Spark submit

I think this is because I am unable to pass the session of my SSH (of .182 server) into the next SparkSubmitOperator or it may be due to some other issue related to --jars or --packages, not sure here.

I was thinking to use xcom_push to push some data from my BashOperator and xcom_pull into the SparkSubmitOperator but not sure how to pass it in a way that my server is logged in and then my SparkSubmitOperator gets triggered from that box itself?

Airflow dag code:

    t2 = BashOperator(
    task_id='test_bash_operator',
    bash_command="/Users/hardikgoel/Downloads/Work/airflow_dir/shell_files/airflow_prod_ssh_script.sh ",
    dag=dag)
t2

t3_config = {
    'conf': {
        "spark.yarn.maxAppAttempts": "1",
        "spark.yarn.executor.memoryOverhead": "8"
    },
    'conn_id': 'spark_default',
    'packages': 'com.sparkjobs.SparkJobsApplication',
    'jars': '/var/spark/spark-jobs-0.0.1-SNAPSHOT-1/spark-jobs-0.0.1-SNAPSHOT.jar firstJob',
    'driver_memory': '1g',
    'total_executor_cores': '21',
    'executor_cores': 7,
    'executor_memory': '48g'
}


t3 = SparkSubmitOperator(
    task_id='t3',
    **t3_config)

t2 >> t3

Shell Script code:

    #!/bin/bash
USERNAME=hardikgoel
HOSTS="100.18.10.182"
SCRIPT="pwd; ls"
ssh -l ${USERNAME} ${HOSTS} "${SCRIPT}"
echo "SSHed successfully"
if [ ${PIPESTATUS[0]} -eq 0 ]; then
  echo "successfull"
fi

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

写给空气的情书 2025-01-26 17:52:50

将 SSH 和 Spark 提交命令组合在同一个 BashOperator 中:

t2 = BashOperator(
    task_id='ssh_and_spark_submit',
    bash_command="ssh -tt ${USERNAME}@${HOSTS} '/path/to/spark-submit --jars ${JARS} --packages ${PACKAGES} ${SPARK_SUBMIT_ARGS}'",
    dag=dag
)

您还可以使用 xcom:

t2 = BashOperator(
    task_id='ssh_and_push_success',
    bash_command="ssh -tt ${USERNAME}@${HOSTS} 'pwd; ls' && echo 'success'",
    xcom_push=True,
    dag=dag
)

t3 = SparkSubmitOperator(
    task_id='spark_submit_if_ssh_success',
    trigger_rule='one_success',
    provide_context=True,  # Access XCom value
    **t3_config
)

def trigger_spark_if_ssh_success(context):
    return context['ti'].xcom_pull(task_ids='ssh_and_push_success') == 'success'

t3.set_upstream(t2)
t3.set_downstream(TriggerDagRunOperator(trigger_dag_id="downstream_dag_id"))

Combine SSH and Spark submit commands within the same BashOperator:

t2 = BashOperator(
    task_id='ssh_and_spark_submit',
    bash_command="ssh -tt ${USERNAME}@${HOSTS} '/path/to/spark-submit --jars ${JARS} --packages ${PACKAGES} ${SPARK_SUBMIT_ARGS}'",
    dag=dag
)

You can also use xcom:

t2 = BashOperator(
    task_id='ssh_and_push_success',
    bash_command="ssh -tt ${USERNAME}@${HOSTS} 'pwd; ls' && echo 'success'",
    xcom_push=True,
    dag=dag
)

t3 = SparkSubmitOperator(
    task_id='spark_submit_if_ssh_success',
    trigger_rule='one_success',
    provide_context=True,  # Access XCom value
    **t3_config
)

def trigger_spark_if_ssh_success(context):
    return context['ti'].xcom_pull(task_ids='ssh_and_push_success') == 'success'

t3.set_upstream(t2)
t3.set_downstream(TriggerDagRunOperator(trigger_dag_id="downstream_dag_id"))
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文