Python mmap 和 multiprocessing.semaphore 的竞争条件
我正在编写一个脚本,它与 multiprocessing.Process 同时处理一些 mmap,并更新存储在 mmap 中并用互斥锁锁定的结果列表。
我写入结果列表的函数看起来像这样
def update_result(result_mmap, new_value, new_value_index, sema):
sema.acquire()
result_mmap.seek(0)
old_result = result_mmap.readline().split("\t")
old_result[new_value_index] = new_value
new_result = "\t".join(map(str, old_result))
result_mmap.resize(len(new_result))
result_mmap.seek(0)
result_mmap.write(new_result)
sema.release()
这有时有效,但其他时候,根据进程的执行顺序,result_mmap 似乎没有正确调整大小。我不知道从这里看哪里 - 我知道存在竞争条件,但我不知道为什么。
编辑:这是调用 update_result 的函数:
def apply_function(mmapped_files, function, result_mmap, result_index, sema):
for mf in mmapped_files:
accumulator = int(mf.readline())
while True:
line = mf.readline()
if line is None or line == '':
break
num = int(line)
accumulator = function(num, accumulator)
update_result(result_mmap, result_index, inc, sema)
I am writing a script that processes some mmaps concurrently with multiprocessing.Process and updates a result list stored in an mmap and locked with a mutex.
My function to write to the result list looks like this
def update_result(result_mmap, new_value, new_value_index, sema):
sema.acquire()
result_mmap.seek(0)
old_result = result_mmap.readline().split("\t")
old_result[new_value_index] = new_value
new_result = "\t".join(map(str, old_result))
result_mmap.resize(len(new_result))
result_mmap.seek(0)
result_mmap.write(new_result)
sema.release()
This works SOMETIMES, but other times, depending on the order of execution of the processes, it seems that the result_mmap isn't resizing properly. I am not sure where to look from here- I know that a race condition exists but I don't know why.
Edit: This is the function that calls update_result
:
def apply_function(mmapped_files, function, result_mmap, result_index, sema):
for mf in mmapped_files:
accumulator = int(mf.readline())
while True:
line = mf.readline()
if line is None or line == '':
break
num = int(line)
accumulator = function(num, accumulator)
update_result(result_mmap, result_index, inc, sema)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
也许我错了,但是您确定信号量确实在进程之间起作用(它是系统互斥体吗?)。因为如果不是,进程就不会共享相同的内存空间。您可能需要考虑使用线程库,以便线程使用相同的信号量。
Maybe I'm wrong, but are you sure that the semaphore really works between the processes (is it a system mutex?). Because if it's not, processes do not share the same memory space. What you might want to consider using would be the threading library, in order for the threads to use the same semaphore.