返回介绍

9.7 同步文件和变量访问

发布于 2024-01-25 21:44:08 字数 8047 浏览 0 评论 0 收藏 0

在下面的例子中,我们会看看多进程共享和操控一个状态——在这种情况下,4个进程以一定次数递增一个共享的计数器。缺少同步过程的话,计数就是不正确的。如果你要以一种一致性的方式来共享数据的话,你总是需要一个方法来同步数据的读写,不然你就会在错误中结束。

典型情况下,同步方法和你所使用的特定操作系统(OS)息息相关,而且它们还常常和你所使用的特定语言息息相关。在这里,我们就看看使用Python库的基于文件的同步,在Python进程间共享一个整数对象。

9.7.1 文件锁

读写一个文件是在本节中共享数据的最慢的例子。

你可以在例9-33中看看我们第一个工作函数。该函数在一个局部计数器上做迭代。在每一次迭代中,它打开了一个文件,读取已存在的值,自增1,然后用新的值覆写掉老的值。在第一次迭代中,文件会是空的或者不存在,所以它会捕捉一个异常并假设值应该为零。

例9-33 没有锁的工作函数

def work(filename, max_count):
  for n in range(max_count):
    f = open(filename, "r")
    try:
      nbr = int(f.read())
    except ValueError as err:
      print "File is empty, starting to count from 0, error: " + str(err)
      nbr = 0
    f = open(filename, "w")
    f.write(str(nbr + 1) + '\n')
    f.close()

让我们用一个进程来运行这个例子。你能在例9-34中看见输出。工作函数被调用了1000次,正如所期望的那样,它计数正确,没有损失任何数据。在第一次读取时,见到了一个空文件。这会为int()抛出invalid literal for int()的错误(因为在一个空字符串上调用了int())。这个错误只发生了一次,之后,我们总是会有一个合法的值用于读取并把它转变成一个整数。

例9-34 不用锁,用一个进程来做基于文件的计数的用时

$ python ex1_nolock.py
Starting 1 process(es) to count to 1000
File is empty, starting to count from 0,
error: invalid literal for int() with base 10: ''
Expecting to see a count of 1000
count.txt contains:
1000

现在我们将用4个并发进程来运行相同的工作函数。我们没有任何加锁的代码,所以我们将期望有一些奇怪的结果。

 问题 

在你查看下面的代码前,当两个进程同时从相同的文件读取或写入的时候,你会期待看见哪两种类型的错误呢?思考一下代码的两种主要状态(每个进程的开始执行处和每个进程的正常运行状态)。

瞧例9-35来看这个问题。首先,当每个进程启动时,文件是空的,所以它们都设法从零开始计数。第二,当一个进程写时,另一个进程能够读到一个部分写完的不能被解析的结果。这会导致异常,就会写回零。这样依次进行,导致我们的计数器保持在重置状态!你能看到\n和两个值如何被两个并发进程写入到同样的打开文件中,导致第三个进程读取了一个无效项吗?

例9-35 不用锁,使用4个进程基于文件的计数的用时

$ python ex1_nolock.py
Starting 4 process(es) to count to 4000
File is empty, starting to count from 0,
error: invalid literal for int() with base 10: ''
File is empty, starting to count from 0,
error: invalid literal for int() with base 10: '1\n7\n'
# many errors like these
Expecting to see a count of 4000
count.txt contains:
629
$ python -m timeit -s "import ex1_nolock" "ex1_nolock.run_workers()"
10 loops, best of 3: 125 msec per loop

例9-36展示了用4个进程调用工作函数的multiprocessing代码。注意我们没有使用map,而是构建了一个Process对象的列表。尽管我们在这里不使用它的功能性,但Process对象给予我们能力来内省每个进程的状态。我们鼓励你读读文档来学习为什么你可能想要使用Process。

例9-36 run_workers设置4个进程

import multiprocessing import os ... MAX_COUNT_PER_PROCESS = 1000 FILENAME = "count.txt" ... def run_workers(): NBR_PROCESSES = 4 total_expected_count = NBR_PROCESSES * MAX_COUNT_PER_PROCESS print "Starting {} process(es) to count to {}".format(NBR_PROCESSES, total_expected_count) # reset counter f = open(FILENAME, "w") f.close() processes = [] for process_nbr in range(NBR_PROCESSES): p = multiprocessing.Process(target=work, args=(FILENAME, MAX_COUNT_PER_PROCESS)) p.start() processes.append(p) for p in processes: p.join() print "Expecting to see a count of {}".format(total_expected_count) print "{} contains:".format(FILENAME) os.system('more ' + FILENAME) if __name__ == "__main__": run_workers()

使用lockfile模块,我们能够引入一种同步方法,这样在同一时刻只有一个进程写,其他进程都要等待轮到它们的时候。因此整体过程运行得更慢,但不会犯错。你可以在例9-37中看到正确的输出。你会发现在线的完整文档。注意加锁机制和Python息息相关,这样其他正在查看这个文件的进程不会关心这个文件已经“被加锁”的本质。

例9-37 使用锁和4个进程基于文件的计数的用时

$ python ex1_lock.py
Starting 4 process(es) to count to 4000
File is empty, starting to count from 0,
error: invalid literal for int() with base 10: ''
Expecting to see a count of 4000
count.txt contains:
4000
$ python -m timeit -s "import ex1_lock" "ex1_lock.run_workers()"
10 loops, best of 3: 401 msec per loop

使用lockfile只是增加了几行代码。首先,我们创建了一个FileLock对象,文件名可以是任意的,但使用和你要加锁的文件相同的名字让命令行调试变得更简单。当你请求得到锁时,FileLock用相同的名字打开了一个新文件,用.lock作为后缀名。

没有任何参数的acquire会无限期阻塞,直到锁变得可用。一旦你拿到了锁,你就能够做你的处理,而没有任何冲突风险。接着一旦你写完(例9-38),你就可以释放锁。

例9-38 有锁的工作函数

def work(filename, max_count):
  lock = lockfile.FileLock(filename)
  for n in range(max_count):
    lock.acquire()
    f = open(filename, "r")
    try:
      nbr = int(f.read())
    except ValueError as err:
      print "File is empty, starting to count from 0, error: " + str(err)
      nbr = 0
    f = open(filename, "w")
    f.write(str(nbr + 1) + '\n')
    f.close()
    lock.release()

你可以使用一个上下文管理器,在这种情况下,你用lock:来代替acquire和release。这给运行时增加了少量的开销,但是也让代码变得更容易读一点。清晰度常常优于执行速度。

你也能够用一个timeout来请求acquire锁,检查已经存在的锁,并打断已经存在的锁。提供了几种加锁机制,对每个平台敏感的默认选项隐藏在了Filelock接口后面。

9.7.2 给Value加锁

multiprocessing模块在进程间提供了几个选项来共享Python对象。我们能使用低开销的通信来共享基础对象,也能用一个Manager来共享更高级别的Python对象(例如,字典和列表)(但要注意同步开销会显著地减慢数据共享)。

在这里,我们将使用一个multiprocessing.Value对象在进程间共享一个整数。尽管Value有锁,但是锁没有尽如你意——它阻止了同时读取或写入,但是没有提供一个原子的递增。例9-39演示了这种情况。你能看到我们以一个不正确的计数来终结,这就类似于我们在之前看到的基于文件的不同步的例子。

例9-39 无锁导致计数不正确

$ python ex2_nolock.py
Expecting to see a count of 4000
We have counted to 2340
$ python -m timeit -s "import ex2_nolock" "ex2_nolock.run_workers()"
100 loops, best of 3: 12.6 msec per loop

数据没有发生损坏,但是我们错失了好几次更新。如果你从一个进程写入一个Value,再在另外的进程中消费那个Value(但不修改),这种方式就可能是合适的。

共享Value的代码显示在了例9-40中。我们不得不声明一个数据类型和一个初始值——使用Value(“i”, 0),我们请求一个初始值为0的有符号整数。它被当作一个常规参数传递给我们的Process对象,Process对象负责在后台进程间共享相同的字节块。为了访问由我们的Value所持有的基础对象,我们使用了.value。注意我们正请求一个原地的加法——我们期待它变为一个原子操作,但是Value却不支持,所以我们最终的计数比预期要低。

例9-40 没有锁的计数代码

import multiprocessing

def work(value, max_count):
  for n in range(max_count):
    value.value += 1

def run_workers():
...
  value = multiprocessing.Value('i', 0)
  for process_nbr in range(NBR_PROCESSES):
    p = multiprocessing.Process(target=work, args=(value, MAX_COUNT_PER_PROCESS))
    p.start()
    processes.append(p)
...

我们能够增加一个Lock,它就会以与我们之前所看到的FileLock例子很类似的方式来工作。你能在例9-41中看到正确同步后的计数。

例9-41 使用Lock来同步写一个Value

# lock on the update, but this isn't atomic
$ python ex2_lock.py
Expecting to see a count of 4000
We have counted to 4000
$ python -m timeit -s "import ex2_lock" "ex2_lock.run_workers()"
10 loops, best of 3: 22.2 msec per loop

在例9-42中,我们已经使用了一个上下文管理器(有Lock)来获取锁。就如在之前的FileLock例子中那样,它无限等待来获取锁。

例9-42 使用context manager来获取锁

import multiprocessing

def work(value, max_count, lock):
  for n in range(max_count):
    with lock:
      value.value += 1

def run_workers():
...
  processes = []
  lock = multiprocessing.Lock()
  value = multiprocessing.Value('i', 0)
  for process_nbr in range(NBR_PROCESSES):
    p = multiprocessing.Process(target=work,
                  args=(value, MAX_COUNT_PER_PROCESS, lock))
    p.start()
    processes.append(p)
...

就如在FileLock例子中的那样,避免使用上下文管理器会快一点。例9-43中的片段显示了怎样使用和释放Lock对象。

例9-43 内联加锁,而不用上下文管理器

lock.acquire()
value.value += 1
lock.release()

既然Lock没有给予我们所追求的细粒度,它提供的基础锁浪费了一点不必要的时间。我们能够如例9-44中的那样用一个RawValue取代Value,并取得一个递增的速度提升。如果你有兴趣看看在这个变化背后的字节码,那么就读一下Eli Bendersky关于这个主题的博客帖子。

例9-44 展示最快的RawValue和Lock方法的控制台输出

# RawValue has no lock on it
$ python ex2_lock_rawvalue.py
Expecting to see a count of 4000
We have counted to 4000
$ python -m timeit -s "import ex2_lock_rawvalue" "ex2_lock_rawvalue.run_workers()"
100 loops, best of 3: 12.6 msec per loop

为了使用RawValue,只要如例9-45中所示的那样和Value交换就可以了。

例9-45 使用RawValue整数的例子

...
def run_workers():
...
  lock = multiprocessing.Lock()
  value = multiprocessing.RawValue('i', 0)
  for process_nbr in range(NBR_PROCESSES):
    p = multiprocessing.Process(target=work,
                  args=(value, MAX_COUNT_PER_PROCESS, lock))
    p.start()
    processes.append(p)

如果我们要共享一个基础对象数组,我们也可以使用RawArray来代替一个multiprocessing.Array。

随着在多进程间共享一个标记和同步数据共享,我们已经看到了各种不同的方式来在一台单独的机器上的多进程间划分工作。然而,请记住数据共享能够产生令人头疼的问题——设法尽可能的避开它。让一台机器处理共享状态的所有边边角角的情况是困难的,当你第一次被迫调试多进程交互时,你就会意识到为什么为人所接受的智慧就是尽量避免这种情况。

确实要考虑写出运行慢一点但是更容易被你的团队所理解的代码。使用一个类似Redis的外部工具来共享状态会生成一个在运行时能够被非开发者所检查的系统——这是一种强大的方式来让你的团队监控在你的并行系统中所发生的事情。

一定要记住调试过性能的Python代码更不可能被你团队中的更初级的员工所理解——他们或害怕它,或会破坏它。避免这个问题(接受在速度上的牺牲)来保持团队的高效率。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文