kubernetespederator无法访问Google Secrets Manager中存储的气流连接
我使用的是 Composer 版本 2.0.0 Airflow 2.1.4
,并且创建了一个 KubernetesPodOperator
,它尝试访问存储在 Google Secrets Manager 中的 Airflow 连接。但它无法找到凭据(请检查下面的例外情况)。我还尝试将气流连接机密传递给 KubernetesPodOperator 中的 env_vars ,并尝试将其作为 Kubernetes Secrets 传递给 pod,但仍然没有运气。
下面是我针对上述两种情况的代码:
aws_uri = BaseHook.get_connection('aws_conn').get_uri()
download_file = KubernetesPodOperator(
task_id="download_file_s3_to_gcs",
dag=dag,
name="download_file_s3_to_gcs",
namespace=NAMESPACE,
in_cluster=True,
image=IMAGE_NAME,
arguments=[
"python3",
"%s" % FILENAME,
],
service_account_name=K_SERVICE_ACCOUNT,
env_vars=[k8s.V1EnvVar(name="gcp_conn_id", value=GCP_CONN_ID), k8s.V1EnvVar(name="aws_conn_id", value=aws_uri)],
is_delete_operator_pod=True,
)
secret_aws_conn_id = Secret(
deploy_type='env',
deploy_target='AWS_CONN_ID',
secret='aws-conn-id', // my kubernetes secret
key='aws-conn-key',
)
download_file = KubernetesPodOperator(
task_id="download_file_s3_to_gcs",
dag=dag,
name="download_file_s3_to_gcs",
namespace=NAMESPACE,
in_cluster=True,
image=IMAGE_NAME,
arguments=[
"python3",
"%s" % FILENAME,
],
service_account_name=K_SERVICE_ACCOUNT,
secrets=[secret_aws_conn_id],
env_vars=[k8s.V1EnvVar(name="gcp_conn_id", value=GCP_CONN_ID)],
is_delete_operator_pod=True,
)
虽然,当我打印 connection_id
URI 时,我可以正确获取它的 URI,但该作业总是抛出以下异常
[2022-04-01 08:01:09,563] {pod_manager.py:197} INFO - botocore.exceptions.NoCredentialsError: Unable to locate credentials
[2022-04-01 08:01:09,374] {pod_manager.py:197} INFO - sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: connection
[2022-04-01 08:01:09,375] {pod_manager.py:197} INFO - [SQL: SELECT connection.password AS connection_password, connection.extra AS connection_extra, connection.id AS connection_id, connection.conn_id AS connection_conn_id, connection.conn_type AS connection_conn_type, connection.description AS connection_description, connection.host AS connection_host, connection.schema AS connection_schema, connection.login AS connection_login, connection.port AS connection_port, connection.is_encrypted AS connection_is_encrypted, connection.is_extra_encrypted AS connection_is_extra_encrypted
[2022-04-01 08:01:09,375] {pod_manager.py:197} INFO - FROM connection
[2022-04-01 08:01:09,376] {pod_manager.py:197} INFO - WHERE connection.conn_id = ?
[2022-04-01 08:01:09,376] {pod_manager.py:197} INFO - LIMIT ? OFFSET ?]
[2022-04-01 08:01:09,354] {pod_manager.py:197} INFO - [[34m2022-04-01 08:01:09,297[0m] {[34mconnection.py:[0m407} ERROR[0m - Unable to retrieve connection from secrets backend (MetastoreBackend).
Checking subsequent secrets backend.[0m
有人可以帮我解决这个问题吗?
I am using the Composer version 2.0.0 Airflow 2.1.4
and I have created a KubernetesPodOperator
that is trying to access the Airflow connection stored in the Google Secrets Manager. But it isn't able to locate the credentials (check exceptions below). I have also tried to pass the airflow connection secrets to env_vars
in KubernetesPodOperator
and also tried to pass it as Kubernetes Secrets
to the pod but still no luck.
Below is my code for both the cases above:
aws_uri = BaseHook.get_connection('aws_conn').get_uri()
download_file = KubernetesPodOperator(
task_id="download_file_s3_to_gcs",
dag=dag,
name="download_file_s3_to_gcs",
namespace=NAMESPACE,
in_cluster=True,
image=IMAGE_NAME,
arguments=[
"python3",
"%s" % FILENAME,
],
service_account_name=K_SERVICE_ACCOUNT,
env_vars=[k8s.V1EnvVar(name="gcp_conn_id", value=GCP_CONN_ID), k8s.V1EnvVar(name="aws_conn_id", value=aws_uri)],
is_delete_operator_pod=True,
)
secret_aws_conn_id = Secret(
deploy_type='env',
deploy_target='AWS_CONN_ID',
secret='aws-conn-id', // my kubernetes secret
key='aws-conn-key',
)
download_file = KubernetesPodOperator(
task_id="download_file_s3_to_gcs",
dag=dag,
name="download_file_s3_to_gcs",
namespace=NAMESPACE,
in_cluster=True,
image=IMAGE_NAME,
arguments=[
"python3",
"%s" % FILENAME,
],
service_account_name=K_SERVICE_ACCOUNT,
secrets=[secret_aws_conn_id],
env_vars=[k8s.V1EnvVar(name="gcp_conn_id", value=GCP_CONN_ID)],
is_delete_operator_pod=True,
)
Although, when i print the connection_id
URI i can get the URI for it correctly but the job always throw below exceptions
[2022-04-01 08:01:09,563] {pod_manager.py:197} INFO - botocore.exceptions.NoCredentialsError: Unable to locate credentials
[2022-04-01 08:01:09,374] {pod_manager.py:197} INFO - sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: connection
[2022-04-01 08:01:09,375] {pod_manager.py:197} INFO - [SQL: SELECT connection.password AS connection_password, connection.extra AS connection_extra, connection.id AS connection_id, connection.conn_id AS connection_conn_id, connection.conn_type AS connection_conn_type, connection.description AS connection_description, connection.host AS connection_host, connection.schema AS connection_schema, connection.login AS connection_login, connection.port AS connection_port, connection.is_encrypted AS connection_is_encrypted, connection.is_extra_encrypted AS connection_is_extra_encrypted
[2022-04-01 08:01:09,375] {pod_manager.py:197} INFO - FROM connection
[2022-04-01 08:01:09,376] {pod_manager.py:197} INFO - WHERE connection.conn_id = ?
[2022-04-01 08:01:09,376] {pod_manager.py:197} INFO - LIMIT ? OFFSET ?]
[2022-04-01 08:01:09,354] {pod_manager.py:197} INFO - [[34m2022-04-01 08:01:09,297[0m] {[34mconnection.py:[0m407} ERROR[0m - Unable to retrieve connection from secrets backend (MetastoreBackend).
Checking subsequent secrets backend.[0m
Can someone please help me out to resolve this issue?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您无法将 Airflow 连接直接从 Airflow 传递到 pod。
错误日志显示您有 Airflow 操作员代码在 Pod 内运行。此代码将调用机密管理器/Airflow 元数据数据库以根据连接 ID 检索凭据。从日志中可以看到,Pod 无法访问元数据数据库。
我建议将 GCP 凭据作为 Kubernetes 密钥传递,并在容器内使用 GCS python 客户端。容器本身不应该依赖于 Airflow - Airflow 只调度它。
You can't pass Airflow connections from Airflow to the pod directly.
The error log shows that you have Airflow operator code running inside the Pod. This code will call secret manager/Airflow metadata database to retrieve credentials based on connection ID. As you can see from the log, Pod can't reach metadata database.
I would recommend to pass GCP credentials as a Kubernetes secret and use GCS python client inside the container. The container itself should not be dependent on Airflow - Airflow only schedules it.