无法从 Vault 秘密后端获取 dags 中的气流变量

发布于 2025-01-16 20:44:17 字数 4528 浏览 2 评论 0原文

我正在将气流 1.10.10 与 Vault 结合使用,并尝试检索存储在 Vault 中的变量。我能够检索连接,但不能检索变量。

气流配置文件包含:

airflow.cfg

[secrets]
backend = airflow.contrib.secrets.hashicorp_vault.VaultBackend
backend_kwargs = {"connections_path": "connections", "variables_path": "variables", "mount_point": "secret", "url": "http://127.0.0.1:8200", "auth_type":"token", "token":"token"}

我在Vault侧运行以下命令:

vault login token=token
vault secrets enable -path=secret -version=2 kv
vault kv put secret/connections/smtp_default conn_uri=smtps://user:[email protected]:465
vault kv put secret/variables/auth_KEY_vault test=test123
vault secrets list
vault auth list

我运行以下dag来测试连接和变量的集成:

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from airflow.hooks.base_hook import BaseHook


def get_secrets():
    conn = BaseHook.get_connection('smtp_default')
    var2 = Variable.get("auth_KEY_vault")
    print(var2)
    print(f"Password: {conn.password}, Login: {conn.login}, URI: {conn.get_uri()}, Host: {conn.host}")


with DAG('example_secrets_dags', start_date=datetime(2020, 1, 1), schedule_interval=None) as dag:
    test_task = PythonOperator(
        task_id='test-task',
        python_callable=get_secrets
    )

导致以下错误:

*** Reading local file: /usr/local/airflow/logs/example_secrets_dags/test-task/2022-03-25T07:42:44.764744+00:00/1.log
[2022-03-25 07:42:54,964] {{taskinstance.py:669}} INFO - Dependencies all met for <TaskInstance: example_secrets_dags.test-task 2022-03-25T07:42:44.764744+00:00 [queued]>
[2022-03-25 07:42:55,018] {{taskinstance.py:669}} INFO - Dependencies all met for <TaskInstance: example_secrets_dags.test-task 2022-03-25T07:42:44.764744+00:00 [queued]>
[2022-03-25 07:42:55,022] {{taskinstance.py:879}} INFO - 
--------------------------------------------------------------------------------
[2022-03-25 07:42:55,023] {{taskinstance.py:880}} INFO - Starting attempt 1 of 1
[2022-03-25 07:42:55,024] {{taskinstance.py:881}} INFO - 
--------------------------------------------------------------------------------
[2022-03-25 07:42:55,089] {{taskinstance.py:900}} INFO - Executing <Task(PythonOperator): test-task> on 2022-03-25T07:42:44.764744+00:00
[2022-03-25 07:42:55,098] {{standard_task_runner.py:53}} INFO - Started process 1132 to run task
[2022-03-25 07:42:55,439] {{logging_mixin.py:112}} INFO - Running %s on host %s <TaskInstance: example_secrets_dags.test-task 2022-03-25T07:42:44.764744+00:00 [running]> 5d5d845cb830
[2022-03-25 07:42:55,547] {{logging_mixin.py:112}} INFO - [2022-03-25 07:42:55,546] {base_hook.py:87} INFO - Using connection to: id: smtp_default. Host: relay.example.com, Port: 465, Schema: , Login: user, Password: XXXXXXXX, extra: None
[2022-03-25 07:42:55,573] {{taskinstance.py:1145}} ERROR - 'Variable auth_KEY_vault does not exist'
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/dags/vault_integration_test/vault_integration.py", line 10, in get_secrets
    var2 = Variable.get("auth_KEY_vault")
  File "/usr/local/lib/python3.6/site-packages/airflow/models/variable.py", line 118, in get
    raise KeyError('Variable {} does not exist'.format(key))
KeyError: 'Variable auth_KEY_vault does not exist'
[2022-03-25 07:42:55,586] {{taskinstance.py:1202}} INFO - Marking task as FAILED.dag_id=example_secrets_dags, task_id=test-task, execution_date=20220325T074244, start_date=20220325T074254, end_date=20220325T074255
[2022-03-25 07:43:04,875] {{logging_mixin.py:112}} INFO - [2022-03-25 07:43:04,872] {local_task_job.py:103} INFO - Task exited with return code 1

关于如何从Vault获取变量的任何想法请 ?

预先感谢您的帮助

I am using airflow 1.10.10 with Vault and I am trying to retrieve variables stored in Vault. I am able to retrieve connections but not variables.

The airflow configuration file contains:

airflow.cfg

[secrets]
backend = airflow.contrib.secrets.hashicorp_vault.VaultBackend
backend_kwargs = {"connections_path": "connections", "variables_path": "variables", "mount_point": "secret", "url": "http://127.0.0.1:8200", "auth_type":"token", "token":"token"}

I run the following command on vault side:

vault login token=token
vault secrets enable -path=secret -version=2 kv
vault kv put secret/connections/smtp_default conn_uri=smtps://user:[email protected]:465
vault kv put secret/variables/auth_KEY_vault test=test123
vault secrets list
vault auth list

I run the following dag to test the integration of connections and variables:

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from airflow.hooks.base_hook import BaseHook


def get_secrets():
    conn = BaseHook.get_connection('smtp_default')
    var2 = Variable.get("auth_KEY_vault")
    print(var2)
    print(f"Password: {conn.password}, Login: {conn.login}, URI: {conn.get_uri()}, Host: {conn.host}")


with DAG('example_secrets_dags', start_date=datetime(2020, 1, 1), schedule_interval=None) as dag:
    test_task = PythonOperator(
        task_id='test-task',
        python_callable=get_secrets
    )

Which result to the following error:

*** Reading local file: /usr/local/airflow/logs/example_secrets_dags/test-task/2022-03-25T07:42:44.764744+00:00/1.log
[2022-03-25 07:42:54,964] {{taskinstance.py:669}} INFO - Dependencies all met for <TaskInstance: example_secrets_dags.test-task 2022-03-25T07:42:44.764744+00:00 [queued]>
[2022-03-25 07:42:55,018] {{taskinstance.py:669}} INFO - Dependencies all met for <TaskInstance: example_secrets_dags.test-task 2022-03-25T07:42:44.764744+00:00 [queued]>
[2022-03-25 07:42:55,022] {{taskinstance.py:879}} INFO - 
--------------------------------------------------------------------------------
[2022-03-25 07:42:55,023] {{taskinstance.py:880}} INFO - Starting attempt 1 of 1
[2022-03-25 07:42:55,024] {{taskinstance.py:881}} INFO - 
--------------------------------------------------------------------------------
[2022-03-25 07:42:55,089] {{taskinstance.py:900}} INFO - Executing <Task(PythonOperator): test-task> on 2022-03-25T07:42:44.764744+00:00
[2022-03-25 07:42:55,098] {{standard_task_runner.py:53}} INFO - Started process 1132 to run task
[2022-03-25 07:42:55,439] {{logging_mixin.py:112}} INFO - Running %s on host %s <TaskInstance: example_secrets_dags.test-task 2022-03-25T07:42:44.764744+00:00 [running]> 5d5d845cb830
[2022-03-25 07:42:55,547] {{logging_mixin.py:112}} INFO - [2022-03-25 07:42:55,546] {base_hook.py:87} INFO - Using connection to: id: smtp_default. Host: relay.example.com, Port: 465, Schema: , Login: user, Password: XXXXXXXX, extra: None
[2022-03-25 07:42:55,573] {{taskinstance.py:1145}} ERROR - 'Variable auth_KEY_vault does not exist'
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/dags/vault_integration_test/vault_integration.py", line 10, in get_secrets
    var2 = Variable.get("auth_KEY_vault")
  File "/usr/local/lib/python3.6/site-packages/airflow/models/variable.py", line 118, in get
    raise KeyError('Variable {} does not exist'.format(key))
KeyError: 'Variable auth_KEY_vault does not exist'
[2022-03-25 07:42:55,586] {{taskinstance.py:1202}} INFO - Marking task as FAILED.dag_id=example_secrets_dags, task_id=test-task, execution_date=20220325T074244, start_date=20220325T074254, end_date=20220325T074255
[2022-03-25 07:43:04,875] {{logging_mixin.py:112}} INFO - [2022-03-25 07:43:04,872] {local_task_job.py:103} INFO - Task exited with return code 1

Any idea about how can I get the variables from vault please ?

Thanks in advance for your help

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

暮年 2025-01-23 20:44:17

我发现了错误,在添加变量时,您必须尊重值参数。因此,不应以这种方式添加变量,

vault kv put secret/variables/auth_KEY_vault test=test123

您应该将 value 传递给值,而不是 test

vault kv put secret/variables/auth_KEY_vault value=test123

I found the error, and while adding the variables you must respect the value parameter. So instead of adding a variable that way

vault kv put secret/variables/auth_KEY_vault test=test123

you should pass value to the value instead of test

vault kv put secret/variables/auth_KEY_vault value=test123
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文