Python defaultdict 行为可以通过多处理实现吗?
我不确定这是否可能(希望如此)。我有一个数据集,我通过使用 defaultdict 的过程运行它。 DefaultDict 有一个功能,如果您搜索某些内容并且它不在字典中,它就会被添加(在我的例子中,我正在搜索它们正在添加的值,然后我稍后会搜索这些值,如果它们在字典中,那么我将该值从默认的 false 设置为 True)。工作起来很容易,没有任何问题,但是一旦我尝试对其进行多处理,我就开始得到不正确的结果(真实的数据/进程非常大,而且我有多核硬件,所以为什么不使用它,对吧?)。这是我的结果(具有多处理的表的大小似乎总是在变化,有时在没有多处理的情况下是相同的,但通常会稍微小一些。):
size of Table(with multiprocesing) is: 398
total number of true(with multiprocesing) is 0
size of Table(without multiprocesing) is 402
total number of true(without multiprocessing) is 250
无论如何,这是一些功能代码。顶部是多处理代码,底部是没有多处理的代码(我想出了如何让 defaultdict 与所有新进程共享,但仍然不起作用):
from multiprocessing import Pool
from multiprocessing.managers import BaseManager, DictProxy
from collections import defaultdict
class MyManager(BaseManager):
pass
MyManager.register('defaultdict', defaultdict, DictProxy)
def test(i,x, T):
target_sum = 100
# T[x, i] is True if 'x' can be solved
# by a linear combination of data[:i+1]
#T = defaultdict(bool) # all values are False by default
T[0, 0] = True # base case
for s in range(target_sum + 1): #set the range of one higher than sum to include sum itself
#print s
for c in range(s / x + 1):
if T[s - c * x, i]:
T[s, i + 1] = True
data = [2,5,8]
pool = Pool(processes=2)
mgr = MyManager()
mgr.start()
T = mgr.defaultdict(bool)
T[0, 0] = True
for i, x in enumerate(data): # i is index, x is data[i]
pool.apply_async(test, (i,x, T))
pool.close()
pool.join()
pool.terminate()
print 'size of Table(with multiprocesing) is:', len(T)
count_of_true = []
for x in T.items():
if T[x] == True:
count_of_true.append(x)
print 'total number of true(with multiprocesing) is ', len(count_of_true)
#now lets try without multiprocessing
target_sum = 100
# T[x, i] is True if 'x' can be solved
# by a linear combination of data[:i+1]
T1 = defaultdict(bool) # all values are False by default
T1[0, 0] = True # base case
for i, x in enumerate(data): # i is index, x is data[i]
for s in range(target_sum + 1): #set the range of one higher than sum to include sum itself
for c in range(s / x + 1):
if T1[s - c * x, i]:
T1[s, i + 1] = True
print 'size of Table(without multiprocesing) is ', len(T1)
count = []
for x in T1:
if T1[x] == True:
count.append(x)
print 'total number of true(without multiprocessing) is ', len(count)
我希望有一个解决方案。在过去的两周里,我尝试将其运行到数据库,但对于非常非常大的数据集来说,它太慢了。上述进程处理内存中的所有内容(但仍然需要几个小时才能在我的测试数据上运行,这就是我想在其上使用多核的原因)。
I'm not sure if this is possible(hoping so). I have a dataset that I run through a process that uses defaultdict. DefaultDict has a feature where if you search something and its not in the dictionary it gets added(in my case I'm searching for values they are getting added then I'm searching later on for those values and if they are in the dict then I set the value to True from the default false). Works pretty easily with no problems but I start getting incorrect results once I try to multiprocess this(The real data/process is pretty large and plus I have multicore hardware so why not use it, right?). Here's my results(the size of the table with multiprocessing seems to always be changing, sometimes its the same without multiprocess but often its slightly less.):
size of Table(with multiprocesing) is: 398
total number of true(with multiprocesing) is 0
size of Table(without multiprocesing) is 402
total number of true(without multiprocessing) is 250
Anyways, Here's some functional code. At the top is the multiprocessed code and at the bottom is without multiprocessed(I figured out how to get the defaultdict to be shared with all the new proceses but still doesn't work):
from multiprocessing import Pool
from multiprocessing.managers import BaseManager, DictProxy
from collections import defaultdict
class MyManager(BaseManager):
pass
MyManager.register('defaultdict', defaultdict, DictProxy)
def test(i,x, T):
target_sum = 100
# T[x, i] is True if 'x' can be solved
# by a linear combination of data[:i+1]
#T = defaultdict(bool) # all values are False by default
T[0, 0] = True # base case
for s in range(target_sum + 1): #set the range of one higher than sum to include sum itself
#print s
for c in range(s / x + 1):
if T[s - c * x, i]:
T[s, i + 1] = True
data = [2,5,8]
pool = Pool(processes=2)
mgr = MyManager()
mgr.start()
T = mgr.defaultdict(bool)
T[0, 0] = True
for i, x in enumerate(data): # i is index, x is data[i]
pool.apply_async(test, (i,x, T))
pool.close()
pool.join()
pool.terminate()
print 'size of Table(with multiprocesing) is:', len(T)
count_of_true = []
for x in T.items():
if T[x] == True:
count_of_true.append(x)
print 'total number of true(with multiprocesing) is ', len(count_of_true)
#now lets try without multiprocessing
target_sum = 100
# T[x, i] is True if 'x' can be solved
# by a linear combination of data[:i+1]
T1 = defaultdict(bool) # all values are False by default
T1[0, 0] = True # base case
for i, x in enumerate(data): # i is index, x is data[i]
for s in range(target_sum + 1): #set the range of one higher than sum to include sum itself
for c in range(s / x + 1):
if T1[s - c * x, i]:
T1[s, i + 1] = True
print 'size of Table(without multiprocesing) is ', len(T1)
count = []
for x in T1:
if T1[x] == True:
count.append(x)
print 'total number of true(without multiprocessing) is ', len(count)
I hope there's a solution for this. I tried for the past 2 weeks to run this to a database but its too slow with very very large datasets. The above processes handles everything in memory(but still takes a few hours to run on my test data which is why I want to use multicores on it) .
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
使用标准
dict
可以轻松复制defaultdict
的行为。在这种情况下,在我看来,您可以简单地将test
: 中的这一行替换为这一行:
在自定义 Manager 对象之前,看看是否可以让此代码与标准字典一起使用。
但实际上,
i
的每个值似乎都会存储可由处理i + 1
的循环访问的值。这意味着每个循环的结果取决于前一个循环,因此异步方法可能会产生错误。要对此进行扩展,请尝试以下代码:
输出如下:
因此,如您所见,如果调用
p.wait()
,则执行顺序是连续的,如果调用,则执行顺序是无序的你不知道。而且由于它是无序的,您会注意到并非所有i = 0
的计算都在i = 1
和i = 2
之前完成> 计算开始。这有时可能意味着i = 0
计算会写入i = 1
使用的键,但仅在i = 1
计算完成之后已经读过密钥了。 (事实上,尽管上面示例中的表长度相同,但您会注意到order
列表的长度不同。因此,发生了不同的情况,即使它不影响最终结果。)The behavior of
defaultdict
is easy to replicate using a standarddict
. In this case, it looks to me like you could simply replace this line intest
:with this line:
See if you can get this code working with a standard dictionary before bothering to customize a Manager object.
But actually it seems that each value of
i
will store values that may be accessed by the loop that handlesi + 1
. This means that the results for each loop depend on the previous loop, and so an asynchronous approach may produce errors.To expand on this, try this code:
The output is as follows:
So as you can see, the order of execution is sequential if you call
p.wait()
and is out-of-order if you don't. And because it's out of order, you'll notice that not all calculations fori = 0
are complete before thei = 1
andi = 2
calculations begin. That may sometimes mean that ani = 0
calculation writes to a key thati = 1
uses, but only after thei = 1
calculation has already read the key. (And indeed, although the table is the same length in the above example, you'll notice that the length of theorder
list is different. So something different is happening, even when it doesn't affect the final result.)我快速检查了一下,我怀疑子进程正在创建它自己的 T 版本。您需要设置一个全局变量并让管理器更新该变量。
我在测试函数中添加了这个来查看 id T 是什么:
T 823f50 的地址
T 955550的地址
T 955bd0 的地址
因此,当子进程完成时,父进程永远不会收到更新。
我将使用它来设置一个全局或进程共享的字典。
I did a quick check, and I suspected, the child process is creating it's own version of T. You need to setup a global variable and let the manager update the variable.
I added this in the test function to see what id T was:
address of T 823f50
address of T 955550
address of T 955bd0
So when the children complete, the parent never gets the updates.
I'll play with it a bit to set up a global or process-shared dict.
我修复了一些问题,
我从函数调用中删除了 T,这杀死了您定义为 manager.defaultdict(bool) 的进程 var
编辑:实际上,我刚刚意识到,T 是全局的,因为没有 def main,我将 T 恢复回函数调用。对此感到抱歉。 :)
编辑 2:我还在同步后添加了 p.wait() 。我想这可能就是你看到水滴的地方。我注意到同样的水滴,但添加 p.wait 看起来已经阻止了孩子们的水滴。
编辑3:将 p.wait() 更改为 p.get(timeout=5)
您只需要传递功能参数,而不是全局变量。
另外,在循环中, T 完成后的结果是:
T = defaultdict(, {(7, 3): True, (90, 0): False, ... etc })
所以我将 for 循环更改为拿起键,值。
There were a few things I fixed
I removed T from the function calls, this was killing the process var you defined as manager.defaultdict(bool)
Edit: actually, I just realized, T was being global because there is no def main, I restored the T back to the function calls. sorry about that. :)
Edit 2: I also added the p.wait() after your sync. I think this may be where you were seeing the drops. I noticed the same drops, but adding p.wait looks to have stopped the drops from the children.
Edit 3: changed p.wait() to p.get(timeout=5)
You only need to pass the functional args, not the global vars.
Also, in your loop, the result of T once it completes is:
T = defaultdict(, {(7, 3): True, (90, 0): False, ... etc })
So I changed the for loop to pick up the key,value.