蟒蛇 &多处理,将集合生成分解为子流程
我必须根据其他字符串的一些计算生成一组字符串。这需要相当长的时间,而且我正在开发多处理器/多核服务器,所以我认为我可以将这些任务分解成块并将它们传递给不同的进程。
首先,我将第一个字符串列表分成每个 10000 个块,将其发送到创建新集合的进程,然后尝试获取锁定并将这些报告回主进程。但是,我的主进程集是空的!
这是一些代码:
def build_feature_labels(self,strings,return_obj,l):
feature_labels = set()
for s in strings:
feature_labels = feature_labels.union(s.get_feature_labels())
print "method: ", len(feature_labels)
l.acquire()
return_obj.return_feature_labels(feature_labels)
l.release()
print "Thread Done"
def return_feature_labels(self,labs):
self.feature_labels = self.feature_labels.union(labs)
print "length self", len(self.feature_labels)
print "length labs", len(labs)
current_pos = 0
lock = multiprocessing.Lock()
while current_pos < len(orig_strings):
while len(multiprocessing.active_children()) > threads:
print "WHILE: cpu count", str(multiprocessing.cpu_count())
T.sleep(30)
print "number of processes", str(len(multiprocessing.active_children()))
proc = multiprocessing.Process(target=self.build_feature_labels,args=(orig_strings[current_pos:current_pos+self.MAX_ITEMS],self,lock))
proc.start()
current_pos = current_pos + self.MAX_ITEMS
while len(multiprocessing.active_children()) > 0:
T.sleep(3)
print len(self.feature_labels)
奇怪的是 a) 主进程上的 self.feature_labels 是空的,但是当从每个子进程调用它时,它有项目。我认为我在这里采取了错误的方法(这就是我过去在 Java 中的做法!)。有更好的方法吗?
提前致谢。
I've got to generate a set of strings based on some calculations of other strings. This takes quite a while, and I'm working on a multiprocessor/multicore server so I figured that I could break these tasks down into chunks and pass them off to different process.
Firstly I break the first list of strings down into chunks of 10000 each, send this off to a process which creates a new set, then try to obtain a lock and report these back to the master process. However, my master processes's set is empty!
Here's some code:
def build_feature_labels(self,strings,return_obj,l):
feature_labels = set()
for s in strings:
feature_labels = feature_labels.union(s.get_feature_labels())
print "method: ", len(feature_labels)
l.acquire()
return_obj.return_feature_labels(feature_labels)
l.release()
print "Thread Done"
def return_feature_labels(self,labs):
self.feature_labels = self.feature_labels.union(labs)
print "length self", len(self.feature_labels)
print "length labs", len(labs)
current_pos = 0
lock = multiprocessing.Lock()
while current_pos < len(orig_strings):
while len(multiprocessing.active_children()) > threads:
print "WHILE: cpu count", str(multiprocessing.cpu_count())
T.sleep(30)
print "number of processes", str(len(multiprocessing.active_children()))
proc = multiprocessing.Process(target=self.build_feature_labels,args=(orig_strings[current_pos:current_pos+self.MAX_ITEMS],self,lock))
proc.start()
current_pos = current_pos + self.MAX_ITEMS
while len(multiprocessing.active_children()) > 0:
T.sleep(3)
print len(self.feature_labels)
What is strange is a) that self.feature_labels on the master process is empty, but when it is called from each sub-process it has items. I think I'm taking the wrong approach here (it's how I used to do it in Java!). Is there a better approach?
Thanks in advance.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
考虑使用工作人员池: http:// docs.python.org/dev/library/multiprocessing.html#using-a-pool-of-workers。这会以映射缩减方式为您完成大量工作并返回组装的结果。
Consider using a pool of workers: http://docs.python.org/dev/library/multiprocessing.html#using-a-pool-of-workers. This does a lot of the work for you in a map-reduce style and returns the assembled results.
使用 multiprocessing.Pipe 或 Queue (或其他这样的对象)在进程之间传递数据。使用 Pipe 在两个进程之间传递数据,并使用 Queue 来允许多个生产者和消费者。
除了官方文档之外,Doug Hellman 的多处理教程。特别是,它有一个如何使用
multiprocessing.Pool
来实现mapreduce类型操作的示例。它可能非常适合您的目的。Use a multiprocessing.Pipe, or Queue (or other such object) to pass data between processes. Use a Pipe to pass data between two processes, and a Queue to allow multiple producers and consumers.
Along with the official documentation, there are nice examples to be found in Doug Hellman's multiprocessing tutorial. In particular, it has an example of how to use
multiprocessing.Pool
to implement a mapreduce-type operation. It might suit your purposes very well.为什么它不起作用:多处理使用进程,并且进程内存不共享。多处理可以为 IPC 设置共享内存或管道,但必须显式完成。这就是各种建议将数据发送回主站的方式。
Why it didn't work: multiprocessing uses processes, and process memory isn't shared. Multiprocessing can set up shared memory or pipes for IPC, but it must be done explicitly. This is how the various suggestions send data back to the master.