multiprocessing.pool.map 和带有两个参数的函数

发布于 2024-12-21 04:33:23 字数 1306 浏览 0 评论 0原文

我正在使用 multiprocessing.Pool()

这是我想要的 Pool:

def insert_and_process(file_to_process,db):
    db = DAL("path_to_mysql" + db)
    #Table Definations
    db.table.insert(**parse_file(file_to_process))
    return True

if __name__=="__main__":
    file_list=os.listdir(".")
    P = Pool(processes=4)
    P.map(insert_and_process,file_list,db) # here having problem.

我想传递 2 个参数 我想做的是只初始化 4 个数据库连接(这里将尝试在每个函数调用上创建连接,因此可能有数百万个连接并导致 IO 冻结)。如果我可以创建 4 个数据库连接,并为每个进程创建 1 个,那就没问题了。

Pool 有什么解决办法吗?或者我应该放弃它?

编辑:

在你们俩的帮助下,我通过这样做得到了这个:

args=zip(f,cycle(dbs))
Out[-]: 
[('f1', 'db1'),
 ('f2', 'db2'),
 ('f3', 'db3'),
 ('f4', 'db4'),
 ('f5', 'db1'),
 ('f6', 'db2'),
 ('f7', 'db3'),
 ('f8', 'db4'),
 ('f9', 'db1'),
 ('f10', 'db2'),
 ('f11', 'db3'),
 ('f12', 'db4')]

所以这里是它如何工作的,我将把数据库连接代码移到主级别并执行此操作:

def process_and_insert(args):

    #Table Definations
    args[1].table.insert(**parse_file(args[0]))
    return True

if __name__=="__main__":
    file_list=os.listdir(".")
    P = Pool(processes=4)

    dbs = [DAL("path_to_mysql/database") for i in range(0,3)]
    args=zip(file_list,cycle(dbs))
    P.map(insert_and_process,args) # here having problem.

是的,我将测试它并让你们知道。

I am using multiprocessing.Pool()

here is what i want to Pool:

def insert_and_process(file_to_process,db):
    db = DAL("path_to_mysql" + db)
    #Table Definations
    db.table.insert(**parse_file(file_to_process))
    return True

if __name__=="__main__":
    file_list=os.listdir(".")
    P = Pool(processes=4)
    P.map(insert_and_process,file_list,db) # here having problem.

I want to pass 2 arguments
What i want to do is to initialize only 4 DB connections (here will try to create connection on every function call so possibly millions of them and cause IO Freezed to death) . if i can create 4 db connections and 1 for each processes it will be ok.

Is there any solution for Pool ? or should i abandon it ?

EDIT:

From help of both of you i got this by doing this:

args=zip(f,cycle(dbs))
Out[-]: 
[('f1', 'db1'),
 ('f2', 'db2'),
 ('f3', 'db3'),
 ('f4', 'db4'),
 ('f5', 'db1'),
 ('f6', 'db2'),
 ('f7', 'db3'),
 ('f8', 'db4'),
 ('f9', 'db1'),
 ('f10', 'db2'),
 ('f11', 'db3'),
 ('f12', 'db4')]

So here it how it gonna work , i gonna move DB connection code out to the main level and do this:

def process_and_insert(args):

    #Table Definations
    args[1].table.insert(**parse_file(args[0]))
    return True

if __name__=="__main__":
    file_list=os.listdir(".")
    P = Pool(processes=4)

    dbs = [DAL("path_to_mysql/database") for i in range(0,3)]
    args=zip(file_list,cycle(dbs))
    P.map(insert_and_process,args) # here having problem.

Yeah , i going to test it out and let you guys know.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

后来的我们 2024-12-28 04:33:23

Pool 文档没有提到一种将多个参数传递给目标函数的方法 - 我尝试过只传递一个序列,但没有展开(每个参数的序列中的一项) )。

但是,您可以编写目标函数以期望第一个(也是唯一的)参数是一个元组,其中每个元素都是您期望的参数之一:(

from itertools import repeat

def insert_and_process((file_to_process,db)):
    db = DAL("path_to_mysql" + db)
    #Table Definations
    db.table.insert(**parse_file(file_to_process))
    return True

if __name__=="__main__":
    file_list=os.listdir(".")
    P = Pool(processes=4)
    P.map(insert_and_process,zip(file_list,repeat(db))) 

请注意 insert_and_process 定义中的额外括号code> - python 将其视为应该是 2 项序列的单个参数。序列的第一个元素归因于第一个变量,另一个元素归因于第二个变量。

The Pool documentation does not say of a way of passing more than one parameter to the target function - I've tried just passing a sequence, but does not get unfolded (one item of the sequence for each parameter).

However, you can write your target function to expect the first (and only) parameter to be a tuple, in which each element is one of the parameters you are expecting:

from itertools import repeat

def insert_and_process((file_to_process,db)):
    db = DAL("path_to_mysql" + db)
    #Table Definations
    db.table.insert(**parse_file(file_to_process))
    return True

if __name__=="__main__":
    file_list=os.listdir(".")
    P = Pool(processes=4)
    P.map(insert_and_process,zip(file_list,repeat(db))) 

(note the extra parentheses in the definition of insert_and_process - python treat that as a single parameter that should be a 2-item sequence. The first element of the sequence is attributed to the first variable, and the other to the second)

想挽留 2024-12-28 04:33:23

您的池将生成四个进程,每个进程都由它自己的 Python 解释器实例运行。您可以使用全局变量来保存数据库连接对象,以便每个进程只创建一个连接:

global_db = None

def insert_and_process(file_to_process, db):
    global global_db
    if global_db is None:
        # If this is the first time this function is called within this
        # process, create a new connection.  Otherwise, the global variable
        # already holds a connection established by a former call.
        global_db = DAL("path_to_mysql" + db)
    global_db.table.insert(**parse_file(file_to_process))
    return True

由于 Pool.map() 及其朋友仅支持单参数工作函数,因此您需要创建转发工作的包装器:

def insert_and_process_helper(args):
    return insert_and_process(*args)

if __name__ == "__main__":
    file_list=os.listdir(".")
    db = "wherever you get your db"
    # Create argument tuples for each function call:
    jobs = [(file, db) for file in file_list]
    P = Pool(processes=4)
    P.map(insert_and_process_helper, jobs)

Your pool will spawn four processes, each run by it's own instance of the Python interpreter. You can use a global variable to hold your database connection object, so that exactly one connection is created per process:

global_db = None

def insert_and_process(file_to_process, db):
    global global_db
    if global_db is None:
        # If this is the first time this function is called within this
        # process, create a new connection.  Otherwise, the global variable
        # already holds a connection established by a former call.
        global_db = DAL("path_to_mysql" + db)
    global_db.table.insert(**parse_file(file_to_process))
    return True

Since Pool.map() and friends only support one-argument worker functions, you need to create a wrapper that forwards the work:

def insert_and_process_helper(args):
    return insert_and_process(*args)

if __name__ == "__main__":
    file_list=os.listdir(".")
    db = "wherever you get your db"
    # Create argument tuples for each function call:
    jobs = [(file, db) for file in file_list]
    P = Pool(processes=4)
    P.map(insert_and_process_helper, jobs)
神也荒唐 2024-12-28 04:33:23

无需使用 zip。例如,如果您有 2 个参数 x 和 y,并且每个参数都可以获取多个值,例如:

X=range(1,6)
Y=range(10)

该函数应该只获取一个参数,并将其解压在内部:

def func(params):
    (x,y)=params
    ...

然后您可以这样调用它:

params = [(x,y) for x in X for y in Y]
pool.map(func, params)

No need to use zip. If for example you have 2 parameters, x and y, and each of them can get several values, like:

X=range(1,6)
Y=range(10)

The function should get only one parameter, and unpack it inside:

def func(params):
    (x,y)=params
    ...

And you call it like that:

params = [(x,y) for x in X for y in Y]
pool.map(func, params)
破晓 2024-12-28 04:33:23

您可以使用

from functools import partial 

库来实现此目的,

例如

func = partial(rdc, lat, lng)
r = pool.map(func, range(8))

def rdc(lat,lng,x):
    pass 

You can use

from functools import partial 

library for this purpose

like

func = partial(rdc, lat, lng)
r = pool.map(func, range(8))

and

def rdc(lat,lng,x):
    pass 
吖咩 2024-12-28 04:33:23

使用

params=[(x,y) for x in X for y in Y]

您创建 xy 的完整副本,这可能比使用慢

from itertools import repeat
P.map(insert_and_process,zip(file_list,repeat(db)))

Using

params=[(x,y) for x in X for y in Y]

you create a full copy of x and y, and that may be slower than using

from itertools import repeat
P.map(insert_and_process,zip(file_list,repeat(db)))
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文