如何在airflow中为kubernetes pod动态构建资源(V1ResourceRequirements)对象

发布于 2025-01-09 13:02:41 字数 1572 浏览 0 评论 0原文

我目前正在将 DAG 从气流版本 1.10.10 迁移到 2.0.0。

该 DAG 使用自定义 Python 运算符,根据任务的复杂性动态分配资源。 问题是 v1.10.10 中使用的导入(airflow.contrib.kubernetes.pod import Resources)不再有效。我读到,对于 v2.0.0,我应该使用 kubernetes.client.models.V1ResourceRequirements,但我需要动态构建此资源对象。 这可能听起来很愚蠢,但我一直无法找到构建这个对象的正确方法。

例如,我尝试使用

            self.resources = k8s.V1ResourceRequirements(
                request_memory=get_k8s_resources_mapping(resource_request)['memory'],
                limit_memory=get_k8s_resources_mapping(resource_request)['memory_l'],
                request_cpu=get_k8s_resources_mapping(resource_request)['cpu'],
                limit_cpu=get_k8s_resources_mapping(resource_request)['cpu_l']
            )

or

            self.resources = k8s.V1ResourceRequirements(
                requests={'cpu': get_k8s_resources_mapping(resource_request)['cpu'],
                          'memory': get_k8s_resources_mapping(resource_request)['memory']},
                limits={'cpu': get_k8s_resources_mapping(resource_request)['cpu_l'],
                        'memory': get_k8s_resources_mapping(resource_request)['memory_l']}
            )

(get_k8s_resources_mapping(resource_request)['xxxx'] 只是根据resource_request返回一个值,例如内存的“2Gi”或CPU的“2”)

但它们似乎不起作用。任务失败。

所以,我的问题是,如何在 Python 中正确构建 V1ResourceRequirements ? 并且,它在任务实例的 executor_config 属性中应该是什么样子?也许是这样的?

'resources': {'limits': {'cpu': '1', 'memory': '512Mi'}, 'requests': {'cpu': '1', 'memory': '512Mi'}}

I'm currently migrating a DAG from airflow version 1.10.10 to 2.0.0.

This DAG uses a custom python operator where, depending on the complexity of the task, it assigns resources dynamically.
The problem is that the import used in v1.10.10 (airflow.contrib.kubernetes.pod import Resources) no longer works. I read that for v2.0.0 I should use kubernetes.client.models.V1ResourceRequirements, but I need to build this resource object dynamically.
This might sound dumb, but I haven't been able to find the correct way to build this object.

For example, I've tried with

            self.resources = k8s.V1ResourceRequirements(
                request_memory=get_k8s_resources_mapping(resource_request)['memory'],
                limit_memory=get_k8s_resources_mapping(resource_request)['memory_l'],
                request_cpu=get_k8s_resources_mapping(resource_request)['cpu'],
                limit_cpu=get_k8s_resources_mapping(resource_request)['cpu_l']
            )

or

            self.resources = k8s.V1ResourceRequirements(
                requests={'cpu': get_k8s_resources_mapping(resource_request)['cpu'],
                          'memory': get_k8s_resources_mapping(resource_request)['memory']},
                limits={'cpu': get_k8s_resources_mapping(resource_request)['cpu_l'],
                        'memory': get_k8s_resources_mapping(resource_request)['memory_l']}
            )

(get_k8s_resources_mapping(resource_request)['xxxx'] just returns a value depending on the resource_request, like '2Gi' for memory or '2' for cpu)

But they don't seem to work. The task fails.

So, my question is, how would you go about correctly building a V1ResourceRequirements in Python?
And, how should it look in the executor_config attribute of the task instance? Something like this, maybe?

'resources': {'limits': {'cpu': '1', 'memory': '512Mi'}, 'requests': {'cpu': '1', 'memory': '512Mi'}}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

许你一世情深 2025-01-16 13:02:41

正确的语法是:

对于 apache-airflow-providers-cncf-kubernetes>=5.3.0

from kubernetes import client
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

KubernetesPodOperator(
    ...,
    container_resources = client.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "8G"},
        limits={"cpu": "16000m", "memory": "128G"}
    )
)

如果您想动态生成它,只需将 requests/limits 中的值替换为返回预期字符串


以下是代码在早期版本上运行所需的更改。

对于 apache-airflow-providers-cncf-kubernetes<5.3.0 和 >=4.2.0

将导入路径更改为:

from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

对于apache-airflow-providers-cncf-kubernetes<4.2。 0

container_resources 更改为 resources

The The proper syntax is:

For apache-airflow-providers-cncf-kubernetes>=5.3.0:

from kubernetes import client
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

KubernetesPodOperator(
    ...,
    container_resources = client.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "8G"},
        limits={"cpu": "16000m", "memory": "128G"}
    )
)

If you would like to generate it dynamically simply replace the values in requests/limits with function that returns the expected string.


Below are changes needed for the code to work on earlier versions.

For apache-airflow-providers-cncf-kubernetes<5.3.0 and >=4.2.0:

change import path to:

from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator

For apache-airflow-providers-cncf-kubernetes<4.2.0:

Change container_resources to resources

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文