GCP Composer 2(气流2)数据Proc Operator-将软件包传递给PYSPARK_JOB

发布于 2025-02-03 18:46:26 字数 8189 浏览 1 评论 0 原文

我正在使用GCP Composer2安排Pyspark(结构化流)作业, Pyspark代码读/写入Kafka。

DAG使用运算符 - dataproccreateclusteroperator (创建一个GKE群集), dataprocsubmitjoboperator (运行pyspark作业),使用操作员 - dataprocsubmitjoboperator 删除DataProc cluster。

在下面的代码中,我要传递运行读取/写入kafka问题的pyspark代码所需的文件(cert/config文件)


PYSPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {
        "main_python_file_uri": PYSPARK_URI,
        "jar_file_uris" : ["gs://dataproc-spark-jars/mongo-spark-connector_2.12-3.0.2.jar",
                               'gs://dataproc-spark-jars/bson-4.0.5.jar','gs://dataproc-spark-jars/mongo-spark-connector_2.12-3.0.2.jar','gs://dataproc-spark-jars/mongodb-driver-core-4.0.5.jar',
                               'gs://dataproc-spark-jars/mongodb-driver-sync-4.0.5.jar','gs://dataproc-spark-jars/spark-avro_2.12-3.1.2.jar','gs://dataproc-spark-jars/spark-bigquery-with-dependencies_2.12-0.23.2.jar',
                           'gs://dataproc-spark-jars/spark-token-provider-kafka-0-10_2.12-3.2.0.jar','gs://dataproc-spark-jars/htrace-core4-4.1.0-incubating.jar','gs://dataproc-spark-jars/hadoop-client-3.3.1.jar','gs://dataproc-spark-jars/spark-sql-kafka-0-10_2.12-3.2.0.jar','gs://dataproc-spark-jars/hadoop-client-runtime-3.3.1.jar','gs://dataproc-spark-jars/hadoop-client-3.3.1.jar','gs://dataproc-spark-configs/kafka-clients-3.2.0.jar'],
        "file_uris":['gs://kafka-certs/versa-kafka-gke-ca.p12','gs://kafka-certs/syslog-vani.p12',
                     'gs://kafka-certs/alarm-compression-user.p12','gs://kafka-certs/appstats-user.p12',
                     'gs://kafka-certs/insights-user.p12','gs://kafka-certs/intfutil-user.p12',
                     'gs://kafka-certs/reloadpred-chkpoint-user.p12','gs://kafka-certs/reloadpred-user.p12',
                     'gs://dataproc-spark-configs/topic-customer-map.cfg','gs://dataproc-spark-configs/params.cfg','gs://kafka-certs/issues-user.p12','gs://kafka-certs/anomaly-user.p12']
        }
}

path = "gs://dataproc-spark-configs/pip_install.sh"

CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
    project_id=PROJECT_ID,
    zone="us-east1-b",
    master_machine_type="n1-standard-4",
    worker_machine_type="n1-standard-4",
    num_workers=4,
    storage_bucket="dataproc-spark-logs",
    init_actions_uris=[path],
    metadata={'PIP_PACKAGES': 'pyyaml requests pandas openpyxl kafka-python'},
).make()

with models.DAG(
        'UsingComposer2',
        # Continue to run DAG twice per day
        default_args=default_dag_args,
        schedule_interval='0 0/12 * * *',
        catchup=False,
        ) as dag:


    create_dataproc_cluster = DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        cluster_name="composer2",
        region=REGION,
        cluster_config=CLUSTER_GENERATOR_CONFIG
    )

    run_dataproc_spark = DataprocSubmitJobOperator(
        task_id="run_dataproc_spark",
        job=PYSPARK_JOB,
        location=REGION,
        project_id=PROJECT_ID,
    )

    delete_dataproc_cluster = DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        project_id=PROJECT_ID,
        cluster_name=CLUSTER_NAME,
        region=REGION
    )


create_dataproc_cluster >> run_dataproc_spark >> delete_dataproc_cluster


。 -kafka ? 当我做一个 Spark -Submit - 我可以通过包裹时,如何使用Composer/Airflow进行操作?

示例Spark-Submit命令,我通过Spark-Sql-Kafka和Mongo-Spark-Connector套件

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,org.mongodb.spark:mongo-spark-connector_2.12:3.0.2 /Users/karanalang/PycharmProjects/Kafka/StructuredStreaming-KafkaConsumer-insignts.py

TIA!

更新 : 基于@anjela B的建议,尝试了以下内容,但它不起作用,

对Pyspark_job进行了更改,以传递软件包:

PYSPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {
        "main_python_file_uri": PYSPARK_URI,
        "properties": { #you can use this field to pass other properties
            "org.apache.spark": "spark-sql-kafka-0-10_2.12:3.1.3",
            "org.mongodb.spark": "mongo-spark-connector_2.12:3.0.2"
        },
        "file_uris":['gs://kafka-certs/versa-kafka-gke-ca.p12','gs://kafka-certs/syslog-vani.p12',
                     'gs://kafka-certs/alarm-compression-user.p12','gs://kafka-certs/appstats-user.p12',
                     'gs://kafka-certs/insights-user.p12','gs://kafka-certs/intfutil-user.p12',
                     'gs://kafka-certs/reloadpred-chkpoint-user.p12','gs://kafka-certs/reloadpred-user.p12',
                     'gs://dataproc-spark-configs/topic-customer-map.cfg','gs://dataproc-spark-configs/params.cfg','gs://kafka-certs/issues-user.p12','gs://kafka-certs/anomaly-user.p12']
        }

错误:

22/06/17 22:57:28 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1655505629376_0004
22/06/17 22:57:29 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at versa-insights2-m/10.142.0.70:8030
22/06/17 22:57:30 INFO com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl: Ignoring exception of type GoogleJsonResponseException; verified object already exists with desired state.
Traceback (most recent call last):
  File "/tmp/8991c714-7036-45ff-b61b-ece54cfffc51/alarm_insights.py", line 442, in <module>
    sys.exit(main())
  File "/tmp/8991c714-7036-45ff-b61b-ece54cfffc51/alarm_insights.py", line 433, in main
    main_proc = insightGen()
  File "/tmp/8991c714-7036-45ff-b61b-ece54cfffc51/alarm_insights.py", line 99, in __init__
    self.all_DF = self.spark.read \
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 210, in load
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o63.load.
: java.lang.ClassNotFoundException: Failed to find data source: mongo. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:692)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:746)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:265)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: mongo.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:666)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:666)
    at scala.util.Failure.orElse(Try.scala:224)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:666)
    ... 14 more 

I'm using GCP Composer2 to schedule pyspark (Structured Streaming) jobs,
The pyspark code reads/writes into Kafka.

The DAG uses operators - DataprocCreateClusterOperator (creates a GKE cluster),
DataprocSubmitJobOperator (runs the pyspark job), using operator - DataprocSubmitJobOperator deletes the dataproc cluster.

In the code below, i'm passing the jars and the files(certs/config files) required to run the pyspark code that reads/writes into Kafka


PYSPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {
        "main_python_file_uri": PYSPARK_URI,
        "jar_file_uris" : ["gs://dataproc-spark-jars/mongo-spark-connector_2.12-3.0.2.jar",
                               'gs://dataproc-spark-jars/bson-4.0.5.jar','gs://dataproc-spark-jars/mongo-spark-connector_2.12-3.0.2.jar','gs://dataproc-spark-jars/mongodb-driver-core-4.0.5.jar',
                               'gs://dataproc-spark-jars/mongodb-driver-sync-4.0.5.jar','gs://dataproc-spark-jars/spark-avro_2.12-3.1.2.jar','gs://dataproc-spark-jars/spark-bigquery-with-dependencies_2.12-0.23.2.jar',
                           'gs://dataproc-spark-jars/spark-token-provider-kafka-0-10_2.12-3.2.0.jar','gs://dataproc-spark-jars/htrace-core4-4.1.0-incubating.jar','gs://dataproc-spark-jars/hadoop-client-3.3.1.jar','gs://dataproc-spark-jars/spark-sql-kafka-0-10_2.12-3.2.0.jar','gs://dataproc-spark-jars/hadoop-client-runtime-3.3.1.jar','gs://dataproc-spark-jars/hadoop-client-3.3.1.jar','gs://dataproc-spark-configs/kafka-clients-3.2.0.jar'],
        "file_uris":['gs://kafka-certs/versa-kafka-gke-ca.p12','gs://kafka-certs/syslog-vani.p12',
                     'gs://kafka-certs/alarm-compression-user.p12','gs://kafka-certs/appstats-user.p12',
                     'gs://kafka-certs/insights-user.p12','gs://kafka-certs/intfutil-user.p12',
                     'gs://kafka-certs/reloadpred-chkpoint-user.p12','gs://kafka-certs/reloadpred-user.p12',
                     'gs://dataproc-spark-configs/topic-customer-map.cfg','gs://dataproc-spark-configs/params.cfg','gs://kafka-certs/issues-user.p12','gs://kafka-certs/anomaly-user.p12']
        }
}

path = "gs://dataproc-spark-configs/pip_install.sh"

CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
    project_id=PROJECT_ID,
    zone="us-east1-b",
    master_machine_type="n1-standard-4",
    worker_machine_type="n1-standard-4",
    num_workers=4,
    storage_bucket="dataproc-spark-logs",
    init_actions_uris=[path],
    metadata={'PIP_PACKAGES': 'pyyaml requests pandas openpyxl kafka-python'},
).make()

with models.DAG(
        'UsingComposer2',
        # Continue to run DAG twice per day
        default_args=default_dag_args,
        schedule_interval='0 0/12 * * *',
        catchup=False,
        ) as dag:


    create_dataproc_cluster = DataprocCreateClusterOperator(
        task_id="create_dataproc_cluster",
        cluster_name="composer2",
        region=REGION,
        cluster_config=CLUSTER_GENERATOR_CONFIG
    )

    run_dataproc_spark = DataprocSubmitJobOperator(
        task_id="run_dataproc_spark",
        job=PYSPARK_JOB,
        location=REGION,
        project_id=PROJECT_ID,
    )

    delete_dataproc_cluster = DataprocDeleteClusterOperator(
        task_id="delete_dataproc_cluster",
        project_id=PROJECT_ID,
        cluster_name=CLUSTER_NAME,
        region=REGION
    )


create_dataproc_cluster >> run_dataproc_spark >> delete_dataproc_cluster


Question is - how do i pass package instead of the jars individually for spark-kafka?
When i do a spark-submit - i can pass a package, how do i do the same with Composer/Airflow ?

sample spark-submit command, where i pass the spark-sql-kafka and mongo-spark-connector packages

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,org.mongodb.spark:mongo-spark-connector_2.12:3.0.2 /Users/karanalang/PycharmProjects/Kafka/StructuredStreaming-KafkaConsumer-insignts.py

tia!

Update :
Based on @Anjela B's suggestion, tried the following but it does not work

changes to PYSPARK_JOB, to pass package :

PYSPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {
        "main_python_file_uri": PYSPARK_URI,
        "properties": { #you can use this field to pass other properties
            "org.apache.spark": "spark-sql-kafka-0-10_2.12:3.1.3",
            "org.mongodb.spark": "mongo-spark-connector_2.12:3.0.2"
        },
        "file_uris":['gs://kafka-certs/versa-kafka-gke-ca.p12','gs://kafka-certs/syslog-vani.p12',
                     'gs://kafka-certs/alarm-compression-user.p12','gs://kafka-certs/appstats-user.p12',
                     'gs://kafka-certs/insights-user.p12','gs://kafka-certs/intfutil-user.p12',
                     'gs://kafka-certs/reloadpred-chkpoint-user.p12','gs://kafka-certs/reloadpred-user.p12',
                     'gs://dataproc-spark-configs/topic-customer-map.cfg','gs://dataproc-spark-configs/params.cfg','gs://kafka-certs/issues-user.p12','gs://kafka-certs/anomaly-user.p12']
        }

Error :

22/06/17 22:57:28 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1655505629376_0004
22/06/17 22:57:29 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at versa-insights2-m/10.142.0.70:8030
22/06/17 22:57:30 INFO com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl: Ignoring exception of type GoogleJsonResponseException; verified object already exists with desired state.
Traceback (most recent call last):
  File "/tmp/8991c714-7036-45ff-b61b-ece54cfffc51/alarm_insights.py", line 442, in <module>
    sys.exit(main())
  File "/tmp/8991c714-7036-45ff-b61b-ece54cfffc51/alarm_insights.py", line 433, in main
    main_proc = insightGen()
  File "/tmp/8991c714-7036-45ff-b61b-ece54cfffc51/alarm_insights.py", line 99, in __init__
    self.all_DF = self.spark.read \
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 210, in load
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o63.load.
: java.lang.ClassNotFoundException: Failed to find data source: mongo. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:692)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:746)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:265)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: mongo.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:666)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:666)
    at scala.util.Failure.orElse(Try.scala:224)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:666)
    ... 14 more 

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

花辞树 2025-02-10 18:46:26

您可以使用以下代码通过配置:

import datetime

from airflow import models
from airflow.operators import bash
from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
PYSPARK_JOB = {
    "pyspark_job": {
      "main_python_file_uri": 
        "gs://<bucket>/20220606.py", #this field is for .py packages
      "properties": { #you can use this field to pass other properties
        "org.apache.spark": "spark-sql-kafka-0-10_2.12:3.2.0",
        "org.mongodb.spark": "mongo-spark-connector_2.12:3.0.2"
      },
      "python_file_uris": ["gs://<bucket>/20220606.py"]
    },
    "reference": {
      "project_id": "<project_id>"
    },
    "placement": {
      "cluster_name": "<cluster_name>"
    }
  }


REGION = "us-central1"
PROJECT_ID = "<project_id>"

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with models.DAG(
        'composer_quickstart',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')
    
    
    run_dataproc_spark = DataprocSubmitJobOperator(
        task_id="run_dataproc_spark",
        job=PYSPARK_JOB,
        location=REGION,
        project_id=PROJECT_ID,
    )
    print_dag_run_conf >> run_dataproc_spark

我遵循此知道用于传递所需软件包的字段。

气流DAG日志:

*** Reading remote log from gs://us-central1-case-20220331-fde8f6be-bucket/logs/composer_quickstart/run_dataproc_spark/2022-06-06T06:53:24.637504+00:00/1.log.
[2022-06-06, 06:53:39 UTC] {taskinstance.py:1033} INFO - Dependencies all met for <TaskInstance: composer_quickstart.run_dataproc_spark manual__2022-06-06T06:53:24.637504+00:00 [queued]>
[2022-06-06, 06:53:39 UTC] {taskinstance.py:1033} INFO - Dependencies all met for <TaskInstance: composer_quickstart.run_dataproc_spark manual__2022-06-06T06:53:24.637504+00:00 [queued]>
[2022-06-06, 06:53:39 UTC] {taskinstance.py:1239} INFO - 
--------------------------------------------------------------------------------
[2022-06-06, 06:53:39 UTC] {taskinstance.py:1240} INFO - Starting attempt 1 of 2
[2022-06-06, 06:53:39 UTC] {taskinstance.py:1241} INFO - 
--------------------------------------------------------------------------------
[2022-06-06, 06:53:39 UTC] {taskinstance.py:1260} INFO - Executing <Task(DataprocSubmitJobOperator): run_dataproc_spark> on 2022-06-06 06:53:24.637504+00:00
[2022-06-06, 06:53:39 UTC] {standard_task_runner.py:52} INFO - Started process 65510 to run task
[2022-06-06, 06:53:39 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'composer_quickstart', 'run_dataproc_spark', 'manual__2022-06-06T06:53:24.637504+00:00', '--job-id', '21439', '--raw', '--subdir', 'DAGS_FOLDER/20220606_1.py', '--cfg-path', '/tmp/tmp7p1eyqqm', '--error-file', '/tmp/tmpdr2m4rwe']
[2022-06-06, 06:53:39 UTC] {standard_task_runner.py:77} INFO - Job 21439: Subtask run_dataproc_spark
[2022-06-06, 06:53:41 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: composer_quickstart.run_dataproc_spark manual__2022-06-06T06:53:24.637504+00:00 [running]> on host airflow-worker-7b5f8fc749-pd8f9
[2022-06-06, 06:53:44 UTC] {taskinstance.py:1426} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=
AIRFLOW_CTX_DAG_OWNER=Composer Example
AIRFLOW_CTX_DAG_ID=composer_quickstart
AIRFLOW_CTX_TASK_ID=run_dataproc_spark
AIRFLOW_CTX_EXECUTION_DATE=2022-06-06T06:53:24.637504+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-06-06T06:53:24.637504+00:00
[2022-06-06, 06:53:44 UTC] {dataproc.py:1878} INFO - Submitting job
[2022-06-06, 06:53:44 UTC] {credentials_provider.py:312} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2022-06-06, 06:53:45 UTC] {dataproc.py:1890} INFO - Job e7e800e7-fbfd-45e0-8021-eca4e2a7a377 submitted successfully.
[2022-06-06, 06:53:45 UTC] {dataproc.py:1903} INFO - Waiting for job e7e800e7-fbfd-45e0-8021-eca4e2a7a377 to complete
[2022-06-06, 06:54:16 UTC] {dataproc.py:1907} INFO - Job e7e800e7-fbfd-45e0-8021-eca4e2a7a377 completed successfully.
[2022-06-06, 06:54:16 UTC] {taskinstance.py:1268} INFO - Marking task as SUCCESS. dag_id=composer_quickstart, task_id=run_dataproc_spark, execution_date=20220606T065324, start_date=20220606T065339, end_date=20220606T065416
[2022-06-06, 06:54:16 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-06-06, 06:54:16 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check

提交工作:

You may use the following code to pass the configuration:

import datetime

from airflow import models
from airflow.operators import bash
from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
PYSPARK_JOB = {
    "pyspark_job": {
      "main_python_file_uri": 
        "gs://<bucket>/20220606.py", #this field is for .py packages
      "properties": { #you can use this field to pass other properties
        "org.apache.spark": "spark-sql-kafka-0-10_2.12:3.2.0",
        "org.mongodb.spark": "mongo-spark-connector_2.12:3.0.2"
      },
      "python_file_uris": ["gs://<bucket>/20220606.py"]
    },
    "reference": {
      "project_id": "<project_id>"
    },
    "placement": {
      "cluster_name": "<cluster_name>"
    }
  }


REGION = "us-central1"
PROJECT_ID = "<project_id>"

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with models.DAG(
        'composer_quickstart',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')
    
    
    run_dataproc_spark = DataprocSubmitJobOperator(
        task_id="run_dataproc_spark",
        job=PYSPARK_JOB,
        location=REGION,
        project_id=PROJECT_ID,
    )
    print_dag_run_conf >> run_dataproc_spark

I followed this PySpark Job Documentation to know which field to use to pass required packages.

AirFlow DAG logs:

*** Reading remote log from gs://us-central1-case-20220331-fde8f6be-bucket/logs/composer_quickstart/run_dataproc_spark/2022-06-06T06:53:24.637504+00:00/1.log.
[2022-06-06, 06:53:39 UTC] {taskinstance.py:1033} INFO - Dependencies all met for <TaskInstance: composer_quickstart.run_dataproc_spark manual__2022-06-06T06:53:24.637504+00:00 [queued]>
[2022-06-06, 06:53:39 UTC] {taskinstance.py:1033} INFO - Dependencies all met for <TaskInstance: composer_quickstart.run_dataproc_spark manual__2022-06-06T06:53:24.637504+00:00 [queued]>
[2022-06-06, 06:53:39 UTC] {taskinstance.py:1239} INFO - 
--------------------------------------------------------------------------------
[2022-06-06, 06:53:39 UTC] {taskinstance.py:1240} INFO - Starting attempt 1 of 2
[2022-06-06, 06:53:39 UTC] {taskinstance.py:1241} INFO - 
--------------------------------------------------------------------------------
[2022-06-06, 06:53:39 UTC] {taskinstance.py:1260} INFO - Executing <Task(DataprocSubmitJobOperator): run_dataproc_spark> on 2022-06-06 06:53:24.637504+00:00
[2022-06-06, 06:53:39 UTC] {standard_task_runner.py:52} INFO - Started process 65510 to run task
[2022-06-06, 06:53:39 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'composer_quickstart', 'run_dataproc_spark', 'manual__2022-06-06T06:53:24.637504+00:00', '--job-id', '21439', '--raw', '--subdir', 'DAGS_FOLDER/20220606_1.py', '--cfg-path', '/tmp/tmp7p1eyqqm', '--error-file', '/tmp/tmpdr2m4rwe']
[2022-06-06, 06:53:39 UTC] {standard_task_runner.py:77} INFO - Job 21439: Subtask run_dataproc_spark
[2022-06-06, 06:53:41 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: composer_quickstart.run_dataproc_spark manual__2022-06-06T06:53:24.637504+00:00 [running]> on host airflow-worker-7b5f8fc749-pd8f9
[2022-06-06, 06:53:44 UTC] {taskinstance.py:1426} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=
AIRFLOW_CTX_DAG_OWNER=Composer Example
AIRFLOW_CTX_DAG_ID=composer_quickstart
AIRFLOW_CTX_TASK_ID=run_dataproc_spark
AIRFLOW_CTX_EXECUTION_DATE=2022-06-06T06:53:24.637504+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-06-06T06:53:24.637504+00:00
[2022-06-06, 06:53:44 UTC] {dataproc.py:1878} INFO - Submitting job
[2022-06-06, 06:53:44 UTC] {credentials_provider.py:312} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2022-06-06, 06:53:45 UTC] {dataproc.py:1890} INFO - Job e7e800e7-fbfd-45e0-8021-eca4e2a7a377 submitted successfully.
[2022-06-06, 06:53:45 UTC] {dataproc.py:1903} INFO - Waiting for job e7e800e7-fbfd-45e0-8021-eca4e2a7a377 to complete
[2022-06-06, 06:54:16 UTC] {dataproc.py:1907} INFO - Job e7e800e7-fbfd-45e0-8021-eca4e2a7a377 completed successfully.
[2022-06-06, 06:54:16 UTC] {taskinstance.py:1268} INFO - Marking task as SUCCESS. dag_id=composer_quickstart, task_id=run_dataproc_spark, execution_date=20220606T065324, start_date=20220606T065339, end_date=20220606T065416
[2022-06-06, 06:54:16 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-06-06, 06:54:16 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check

Submitted Job:
enter image description here

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文