dataproc 上的组件网关激活不适用于composer(airflow)操作符airflow.providers.google.cloud.operators.dataproc
我正在尝试执行下面这个指令。 创建 dataproc 集群的操作员似乎没有启用可选组件来启用 jupyter notebook 和 anaconda。 我在这里找到了这段代码:Airflow 上带有 DataprocOperator 的组件网关尝试解决它,但对我来说它没有解决它,因为我认为这里的作曲家(气流)版本是不同的。我的版本是composer - 2.0.0-preview.5,airflow-2.1.4。
该操作员在创建集群时工作得很好,但它没有使用可选组件来创建以启用 jupyter 笔记本。 有人有什么想法可以帮助我吗?
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.dataproc_operator import DataprocClusterCreateOperator,DataprocClusterDeleteOperator, DataProcSparkOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
yesterday = datetime.combine(datetime.today() - timedelta(1),
datetime.min.time())
default_args = {
'owner': 'teste3',
'depends_on_past': False,
'start_date' :yesterday,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'teste-dag-3',catchup=False, default_args=default_args, schedule_interval=None)
# configura os componentes
class CustomDataprocClusterCreateOperator(DataprocClusterCreateOperator):
def __init__(self, *args, **kwargs):
super(CustomDataprocClusterCreateOperator, self).__init__(*args, **kwargs)
def _build_cluster_data(self):
cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()
cluster_data['config']['endpointConfig'] = {
'enableHttpPortAccess': True
}
cluster_data['config']['softwareConfig']['optionalComponents'] = [ 'JUPYTER', 'ANACONDA' ]
return cluster_data
create_cluster=CustomDataprocClusterCreateOperator(
dag=dag,
task_id='start_cluster_example',
cluster_name='teste-ge-{{ ds }}',
project_id= "sandbox-coe",
num_workers=2,
num_masters=1,
master_machine_type='n2-standard-8',
worker_machine_type='n2-standard-8',
worker_disk_size=500,
master_disk_size=500,
master_disk_type='pd-ssd',
worker_disk_type='pd-ssd',
image_version='1.5.56-ubuntu18',
tags=['allow-dataproc-internal'],
region="us-central1",
zone='us-central1-f',#Variable.get('gc_zone'),
storage_bucket = "bucket-dataproc-ge",
labels = {'product' : 'sample-label'},
service_account_scopes = ['https://www.googleapis.com/auth/cloud-platform'],
#properties={"yarn:yarn.nodemanager.resource.memory-mb" : 15360,"yarn:yarn.scheduler.maximum-allocation-mb" : 15360},
#subnetwork_uri="projects/project-id/regions/us-central1/subnetworks/dataproc-subnet",
retries= 1,
retry_delay=timedelta(minutes=1)
) #starts a dataproc cluster
stop_cluster_example = DataprocClusterDeleteOperator(
dag=dag,
task_id='stop_cluster_example',
cluster_name='teste-ge-{{ ds }}',
project_id="sandbox-coe",
region="us-central1",
) #stops a running dataproc cluster
create_cluster >> stop_cluster_example
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
编辑:
深入研究后,您不再需要自定义运算符。
更新后的运算符
DataprocCreateClusterOperator
具有enable_component_gateway
和optical_components
,因此您可以直接设置它们:您可以检查此 示例 dag了解更多详情。
您可以在 来源代码。
原始答案:
该运算符已被重写(请参阅 PR)。我认为问题出在您的
_build_cluster_data
函数上。您可能应该将代码更改为:
一些注意事项:
CustomDataprocClusterCreateOperator 已弃用。您应该使用 DataprocCreateClusterOperator >谷歌提供商。
您不需要拥有
cluster_data['config']['endpoint_config']
,您可以通过将optical_components
传递给运算符来直接设置值,请参阅<一个href="https://github.com/apache/airflow/blob/b2c0a921c155e82d1140029e6495594061945025/airflow/providers/google/cloud/example_dags/example_dataproc.py#L340-L346" rel="nofollow noreferrer">源代码 .Edit:
After took a deeper look you don't need a custom operator any more.
The updated operator
DataprocCreateClusterOperator
hasenable_component_gateway
andoptional_components
so you can just set them directly:You can check this example dag for more details.
You can view all possible parameters of
ClusterGenerator
in the source code.Original Answer:
The operator was re-written (see PR). I think the issue is with your
_build_cluster_data
function.You probably should change your code to:
A few notes:
CustomDataprocClusterCreateOperator is deprecated. You should use
DataprocCreateClusterOperator
from the google provider.You don't need to have
cluster_data['config']['endpoint_config']
you can set the value directly by passingoptional_components
to the operator with see source code.