dataproc 上的组件网关激活不适用于composer(airflow)操作符airflow.providers.google.cloud.operators.dataproc

发布于 2025-01-09 11:14:18 字数 3390 浏览 1 评论 0 原文

我正在尝试执行下面这个指令。 创建 dataproc 集群的操作员似乎没有启用可选组件来启用 jupyter notebook 和 anaconda。 我在这里找到了这段代码:Airflow 上带有 DataprocOperator 的组件网关尝试解决它,但对我来说它没有解决它,因为我认为这里的作曲家(气流)版本是不同的。我的版本是composer - 2.0.0-preview.5,airflow-2.1.4。

该操作员在创建集群时工作得很好,但它没有使用可选组件来创建以启用 jupyter 笔记本。 有人有什么想法可以帮助我吗?

from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator,DataprocClusterDeleteOperator, DataProcSparkOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator

yesterday = datetime.combine(datetime.today() - timedelta(1),
                             datetime.min.time())


default_args = {
    'owner': 'teste3',
    'depends_on_past': False,
    'start_date' :yesterday,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),

}

dag = DAG(
    'teste-dag-3',catchup=False, default_args=default_args, schedule_interval=None)


# configura os componentes
class CustomDataprocClusterCreateOperator(DataprocClusterCreateOperator):

    def __init__(self, *args, **kwargs):
        super(CustomDataprocClusterCreateOperator, self).__init__(*args, **kwargs)

    def _build_cluster_data(self):
        cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()
        cluster_data['config']['endpointConfig'] = {
            'enableHttpPortAccess': True
        }
        cluster_data['config']['softwareConfig']['optionalComponents'] = [ 'JUPYTER', 'ANACONDA' ]
        return cluster_data


create_cluster=CustomDataprocClusterCreateOperator(
        dag=dag,
        task_id='start_cluster_example',
        cluster_name='teste-ge-{{ ds }}',
        project_id= "sandbox-coe",
        num_workers=2,
        num_masters=1,
        master_machine_type='n2-standard-8',
        worker_machine_type='n2-standard-8',
        worker_disk_size=500,
        master_disk_size=500,
        master_disk_type='pd-ssd',
        worker_disk_type='pd-ssd',
        image_version='1.5.56-ubuntu18',
        tags=['allow-dataproc-internal'],
        region="us-central1",
        zone='us-central1-f',#Variable.get('gc_zone'),
        storage_bucket = "bucket-dataproc-ge",
        labels = {'product' : 'sample-label'},
        service_account_scopes = ['https://www.googleapis.com/auth/cloud-platform'],
        #properties={"yarn:yarn.nodemanager.resource.memory-mb" : 15360,"yarn:yarn.scheduler.maximum-allocation-mb" : 15360},
        #subnetwork_uri="projects/project-id/regions/us-central1/subnetworks/dataproc-subnet",
        retries= 1,
        retry_delay=timedelta(minutes=1)
    ) #starts a dataproc cluster


stop_cluster_example = DataprocClusterDeleteOperator(
    dag=dag,
    task_id='stop_cluster_example',
    cluster_name='teste-ge-{{ ds }}',
    project_id="sandbox-coe",
    region="us-central1",
    ) #stops a running dataproc cluster




create_cluster  >> stop_cluster_example

I’m trying execute this dag bellow.
It seems that the operator creating a dataproc cluster does not enable enabling the optional components to enable jupyter notebook and anaconda.
I found this code here: Component Gateway with DataprocOperator on Airflow to try to solve it, but for me it didn't solve it because i thikn the composer(airflow) version here is diferente. my version is composer - 2.0.0-preview.5, airflow-2.1.4.

The operator works perfectly when creating the cluster, but it doesn't create with the optional component to enable jupyter notebook.
Does anyone have any ideas to help me?

from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator,DataprocClusterDeleteOperator, DataProcSparkOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator

yesterday = datetime.combine(datetime.today() - timedelta(1),
                             datetime.min.time())


default_args = {
    'owner': 'teste3',
    'depends_on_past': False,
    'start_date' :yesterday,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),

}

dag = DAG(
    'teste-dag-3',catchup=False, default_args=default_args, schedule_interval=None)


# configura os componentes
class CustomDataprocClusterCreateOperator(DataprocClusterCreateOperator):

    def __init__(self, *args, **kwargs):
        super(CustomDataprocClusterCreateOperator, self).__init__(*args, **kwargs)

    def _build_cluster_data(self):
        cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()
        cluster_data['config']['endpointConfig'] = {
            'enableHttpPortAccess': True
        }
        cluster_data['config']['softwareConfig']['optionalComponents'] = [ 'JUPYTER', 'ANACONDA' ]
        return cluster_data


create_cluster=CustomDataprocClusterCreateOperator(
        dag=dag,
        task_id='start_cluster_example',
        cluster_name='teste-ge-{{ ds }}',
        project_id= "sandbox-coe",
        num_workers=2,
        num_masters=1,
        master_machine_type='n2-standard-8',
        worker_machine_type='n2-standard-8',
        worker_disk_size=500,
        master_disk_size=500,
        master_disk_type='pd-ssd',
        worker_disk_type='pd-ssd',
        image_version='1.5.56-ubuntu18',
        tags=['allow-dataproc-internal'],
        region="us-central1",
        zone='us-central1-f',#Variable.get('gc_zone'),
        storage_bucket = "bucket-dataproc-ge",
        labels = {'product' : 'sample-label'},
        service_account_scopes = ['https://www.googleapis.com/auth/cloud-platform'],
        #properties={"yarn:yarn.nodemanager.resource.memory-mb" : 15360,"yarn:yarn.scheduler.maximum-allocation-mb" : 15360},
        #subnetwork_uri="projects/project-id/regions/us-central1/subnetworks/dataproc-subnet",
        retries= 1,
        retry_delay=timedelta(minutes=1)
    ) #starts a dataproc cluster


stop_cluster_example = DataprocClusterDeleteOperator(
    dag=dag,
    task_id='stop_cluster_example',
    cluster_name='teste-ge-{{ ds }}',
    project_id="sandbox-coe",
    region="us-central1",
    ) #stops a running dataproc cluster




create_cluster  >> stop_cluster_example

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

假装不在乎 2025-01-16 11:14:18

编辑:
深入研究后,您不再需要自定义运算符。
更新后的运算符 DataprocCreateClusterOperator 具有 enable_component_gatewayoptical_components,因此您可以直接设置它们:

from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator, DataprocCreateClusterOperator

CLUSTER_GENERATOR = ClusterGenerator(
    project_id=PROJECT_ID,
    region=REGION,
    ...,
    enable_component_gateway=True,
    optional_components = [ 'JUPYTER', 'ANACONDA' ]
).make()

DataprocCreateClusterOperator(
    ...,
    cluster_config=CLUSTER_GENERATOR
)

您可以检查此 示例 dag了解更多详情。
您可以在 来源代码

原始答案:
该运算符已被重写(请参阅 PR)。我认为问题出在您的 _build_cluster_data 函数上。

您可能应该将代码更改为:

def _build_cluster_data(self):
    cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()
    cluster_data['config']['endpoint_config'] = {
        'enableHttpPortAccess': True
    }
    cluster_data['config']['software_config']['optional_components'] = [ 'JUPYTER', 'ANACONDA' ] # redundant see comment 2
    return cluster_data

一些注意事项:

  1. CustomDataprocClusterCreateOperator 已弃用。您应该使用 DataprocCreateClusterOperator >谷歌提供商

  2. 您不需要拥有cluster_data['config']['endpoint_config'],您可以通过将optical_components传递给运算符来直接设置值,请参阅<一个href="https://github.com/apache/airflow/blob/b2c0a921c155e82d1140029e6495594061945025/airflow/providers/google/cloud/example_dags/example_dataproc.py#L340-L346" rel="nofollow noreferrer">源代码 .

Edit:
After took a deeper look you don't need a custom operator any more.
The updated operator DataprocCreateClusterOperator has enable_component_gateway and optional_components so you can just set them directly:

from airflow.providers.google.cloud.operators.dataproc import ClusterGenerator, DataprocCreateClusterOperator

CLUSTER_GENERATOR = ClusterGenerator(
    project_id=PROJECT_ID,
    region=REGION,
    ...,
    enable_component_gateway=True,
    optional_components = [ 'JUPYTER', 'ANACONDA' ]
).make()

DataprocCreateClusterOperator(
    ...,
    cluster_config=CLUSTER_GENERATOR
)

You can check this example dag for more details.
You can view all possible parameters of ClusterGenerator in the source code.

Original Answer:
The operator was re-written (see PR). I think the issue is with your _build_cluster_data function.

You probably should change your code to:

def _build_cluster_data(self):
    cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()
    cluster_data['config']['endpoint_config'] = {
        'enableHttpPortAccess': True
    }
    cluster_data['config']['software_config']['optional_components'] = [ 'JUPYTER', 'ANACONDA' ] # redundant see comment 2
    return cluster_data

A few notes:

  1. CustomDataprocClusterCreateOperator is deprecated. You should use DataprocCreateClusterOperator from the google provider.

  2. You don't need to have cluster_data['config']['endpoint_config'] you can set the value directly by passing optional_components to the operator with see source code.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文