如何在airflow中为kubernetes pod动态构建资源(V1ResourceRequirements)对象
我目前正在将 DAG 从气流版本 1.10.10 迁移到 2.0.0。
该 DAG 使用自定义 Python 运算符,根据任务的复杂性动态分配资源。 问题是 v1.10.10 中使用的导入(airflow.contrib.kubernetes.pod import Resources)不再有效。我读到,对于 v2.0.0,我应该使用 kubernetes.client.models.V1ResourceRequirements,但我需要动态构建此资源对象。 这可能听起来很愚蠢,但我一直无法找到构建这个对象的正确方法。
例如,我尝试使用
self.resources = k8s.V1ResourceRequirements(
request_memory=get_k8s_resources_mapping(resource_request)['memory'],
limit_memory=get_k8s_resources_mapping(resource_request)['memory_l'],
request_cpu=get_k8s_resources_mapping(resource_request)['cpu'],
limit_cpu=get_k8s_resources_mapping(resource_request)['cpu_l']
)
or
self.resources = k8s.V1ResourceRequirements(
requests={'cpu': get_k8s_resources_mapping(resource_request)['cpu'],
'memory': get_k8s_resources_mapping(resource_request)['memory']},
limits={'cpu': get_k8s_resources_mapping(resource_request)['cpu_l'],
'memory': get_k8s_resources_mapping(resource_request)['memory_l']}
)
(get_k8s_resources_mapping(resource_request)['xxxx'] 只是根据resource_request返回一个值,例如内存的“2Gi”或CPU的“2”)
但它们似乎不起作用。任务失败。
所以,我的问题是,如何在 Python 中正确构建 V1ResourceRequirements ? 并且,它在任务实例的 executor_config 属性中应该是什么样子?也许是这样的?
'resources': {'limits': {'cpu': '1', 'memory': '512Mi'}, 'requests': {'cpu': '1', 'memory': '512Mi'}}
I'm currently migrating a DAG from airflow version 1.10.10 to 2.0.0.
This DAG uses a custom python operator where, depending on the complexity of the task, it assigns resources dynamically.
The problem is that the import used in v1.10.10 (airflow.contrib.kubernetes.pod import Resources) no longer works. I read that for v2.0.0 I should use kubernetes.client.models.V1ResourceRequirements, but I need to build this resource object dynamically.
This might sound dumb, but I haven't been able to find the correct way to build this object.
For example, I've tried with
self.resources = k8s.V1ResourceRequirements(
request_memory=get_k8s_resources_mapping(resource_request)['memory'],
limit_memory=get_k8s_resources_mapping(resource_request)['memory_l'],
request_cpu=get_k8s_resources_mapping(resource_request)['cpu'],
limit_cpu=get_k8s_resources_mapping(resource_request)['cpu_l']
)
or
self.resources = k8s.V1ResourceRequirements(
requests={'cpu': get_k8s_resources_mapping(resource_request)['cpu'],
'memory': get_k8s_resources_mapping(resource_request)['memory']},
limits={'cpu': get_k8s_resources_mapping(resource_request)['cpu_l'],
'memory': get_k8s_resources_mapping(resource_request)['memory_l']}
)
(get_k8s_resources_mapping(resource_request)['xxxx'] just returns a value depending on the resource_request, like '2Gi' for memory or '2' for cpu)
But they don't seem to work. The task fails.
So, my question is, how would you go about correctly building a V1ResourceRequirements in Python?
And, how should it look in the executor_config attribute of the task instance? Something like this, maybe?
'resources': {'limits': {'cpu': '1', 'memory': '512Mi'}, 'requests': {'cpu': '1', 'memory': '512Mi'}}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
正确的语法是:
对于 apache-airflow-providers-cncf-kubernetes>=5.3.0:
如果您想动态生成它,只需将 requests/limits 中的值替换为返回预期字符串。
以下是代码在早期版本上运行所需的更改。
对于 apache-airflow-providers-cncf-kubernetes<5.3.0 和 >=4.2.0:
将导入路径更改为:
对于apache-airflow-providers-cncf-kubernetes<4.2。 0:
将
container_resources
更改为resources
The The proper syntax is:
For apache-airflow-providers-cncf-kubernetes>=5.3.0:
If you would like to generate it dynamically simply replace the values in requests/limits with function that returns the expected string.
Below are changes needed for the code to work on earlier versions.
For apache-airflow-providers-cncf-kubernetes<5.3.0 and >=4.2.0:
change import path to:
For apache-airflow-providers-cncf-kubernetes<4.2.0:
Change
container_resources
toresources