我正在尝试使用多处理在 HPC 上运行作业。每个进程的峰值内存使用量约为 44GB。我可以使用的作业类允许使用1-16个节点,每个节点有32个CPU和124GB内存。因此,如果我想尽快运行代码(并且在最大挂起时间限制内),我应该能够在每个节点上运行 2 个 CPU,在所有 16 个节点上最多运行 32 个 CPU。但是,当我指定 mp.Pool(32) 时,作业很快就超出了内存限制,我认为这是因为节点上使用了两个以上的 CPU。
我的本能反应是在运行 python 脚本的 pbs 脚本中指定 2 个 CPU 作为最大数量,但是系统上不允许这种配置。我真的很感激任何见解,今天的大部分时间我都在为这个问题摸不着头脑——而且过去也曾面临并解决过类似的问题,但没有解决这里起作用的基本原理。
以下两个脚本的简化版本:
#!/bin/sh
#PBS -l select=16:ncpus=32:mem=124gb
#PBS -l walltime=24:00:00
module load anaconda3/personal
source activate py_env
python directory/script.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import multiprocessing as mp
def df_function(df, arr1, arr2):
df['col3'] = some_algorithm(df, arr1, arr2)
return df
def parallelize_dataframe(df, func, num_cores):
df_split = np.array_split(df, num_cores)
with mp.Pool(num_cores, maxtasksperchild = 10 ** 3) as pool:
df = pd.concat(pool.map(func, df_split))
return df
def main():
# Loading input data
direc = '/home/dir1/dir2/'
file = 'input_data.csv'
a_file = 'array_a.npy'
b_file = 'array_b.npy'
df = pd.read_csv(direc + file)
a = np.load(direc + a_file)
b = np.load(direc + b_file)
# Globally defining function with keyword defaults
global f
def f(df):
return df_function(df, arr1 = a, arr2 = b)
num_cores = 32 # i.e. 2 per node if evenly distributed.
# Running the function as a multiprocess:
df = parallelize_dataframe(df, f, num_cores)
# Saving:
df.to_csv(direc + 'outfile.csv', index = False)
if __name__ == '__main__':
main()
I am trying to run a job on an HPC using multiprocessing. Each process has a peak memory usage of ~44GB. The job class I can use allows 1-16 nodes to be used, each with 32 CPUs and a memory of 124GB. Therefore if I want to run the code as quickly as possible (and within the max walltime limit) I should be able to run 2 CPUs on each node up to a maximum of 32 across all 16 nodes. However, when I specify mp.Pool(32)
the job quickly exceeds the memory limit, I assume because more than two CPUs were used on a node.
My natural instinct was to specify 2 CPUs as the maximum in the pbs script I run my python script from, however this configuration is not permitted on the system. Would really appreciate any insight, having been scratching my head on this one for most of today - and have faced and worked around similar problems in the past without addressing the fundamentals at play here.
Simplified versions of both scripts below:
#!/bin/sh
#PBS -l select=16:ncpus=32:mem=124gb
#PBS -l walltime=24:00:00
module load anaconda3/personal
source activate py_env
python directory/script.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import multiprocessing as mp
def df_function(df, arr1, arr2):
df['col3'] = some_algorithm(df, arr1, arr2)
return df
def parallelize_dataframe(df, func, num_cores):
df_split = np.array_split(df, num_cores)
with mp.Pool(num_cores, maxtasksperchild = 10 ** 3) as pool:
df = pd.concat(pool.map(func, df_split))
return df
def main():
# Loading input data
direc = '/home/dir1/dir2/'
file = 'input_data.csv'
a_file = 'array_a.npy'
b_file = 'array_b.npy'
df = pd.read_csv(direc + file)
a = np.load(direc + a_file)
b = np.load(direc + b_file)
# Globally defining function with keyword defaults
global f
def f(df):
return df_function(df, arr1 = a, arr2 = b)
num_cores = 32 # i.e. 2 per node if evenly distributed.
# Running the function as a multiprocess:
df = parallelize_dataframe(df, f, num_cores)
# Saving:
df.to_csv(direc + 'outfile.csv', index = False)
if __name__ == '__main__':
main()
发布评论
评论(1)
要按原样运行作业,您只需请求
ncpu=32
,然后在 Python 脚本中设置num_cores = 2
。显然,这会让您支付 32 个核心的费用,然后让其中 30 个核心闲置,这是一种浪费。这里真正的问题是你当前的算法是内存限制的,而不是 CPU 限制的。您应该竭尽全力将文件块读入内存,对这些块进行操作,然后将结果块写入磁盘以便稍后组织。
幸运的是
Dask
就是为了做这种事情而构建的。第一步,您可以取出parallelize_dataframe
函数,并直接使用dask.dataframe
和dask.array
:这需要对
some_algorithm
和to_csv
和from_np_stack
工作方式略有不同,但是您将能够在自己的笔记本电脑上合理地运行这个东西,并且它将扩展到您的集群硬件。您可以使用分布式调度程序甚至将其直接部署到您的集群 dask-jobqueue。To run your job as-is, you could simply request
ncpu=32
and then in your python script setnum_cores = 2
. Obviously this has you paying for 32 cores and then leaving 30 of them idle, which is wasteful.The real problem here is that your current algorithm is memory-bound, not CPU-bound. You should be going to great lengths to read only chunks of your files into memory, operating on the chunks, and then writing the result chunks to disk to be organized later.
Fortunately
Dask
is built to do exactly this kind of thing. As a first step, you can take out theparallelize_dataframe
function and directly load and map yoursome_algorithm
with adask.dataframe
anddask.array
:That will require some tweaks to
some_algorithm
, andto_csv
andfrom_np_stack
work a bit differently, but you will be able to reasonably run this thing just on your own laptop and it will scale to your cluster hardware. You can level up from here by using the distributed scheduler or even deploy it directly to your cluster with dask-jobqueue.