返回介绍

9.4 为 CPU 密集型、阻塞或遗留功能建立接口

发布于 2024-01-30 22:48:37 字数 6787 浏览 0 评论 0 收藏 0

本章最后一节讨论的是访问大多数非Twisted的工作。尽管有高效的异步代码所带来的巨大收益,但为Twisted和Scrapy重写每个库,既不现实也不可行。使用Twisted的线程池和reactor.spawnProcess()方法,我们可以使用任何Python库甚至其他语言编写的二进制包。

9.4.1 处理CPU密集型或阻塞操作的管道

第8章讲到,reactor对于简短、非阻塞的任务非常理想。如果必须要执行一些更复杂或是涉及阻塞的事情,该怎么做呢?Twisted提供了线程池,可以使用reactor.callInThread() API调用,在一些线程中执行慢操作,而不是在主线程中执行(Twisted的reactor)。这就意味着reactor会持续运行其处理过程,并在计算发生时响应事件。请注意,在线程池中的处理不是线程安全的。这就是说当你使用全局状态时,又会出现多线程编程中所有的传统同步问题。让我们从该管道的一个简单版本起步,逐渐编写出完整的代码。

class UsingBlocking(object):
  @defer.inlineCallbacks
  def process_item(self, item, spider):
    price = item["price"][0]

    out = defer.Deferred()
    reactor.callInThread(self._do_calculation, price, out)
    item["price"][0] = yield out

    defer.returnValue(item)

  def _do_calculation(self, price, out):
    new_price = price + 1
    time.sleep(0.10)
    reactor.callFromThread(out.callback, new_price)

在前面的管道中,我们看到了实际运行的基本原语。对于每个Item,我们抽取其价格,并希望使用_do_calculation()方法处理它。该方法使用了一个阻塞操作time.sleep()。我们将使用reactor.callInThread()调用把它放到另一个线程中运行。其中,被调用的函数以及传给该函数的任意数量的参数将会作为参数。显然,我们不只传递了price,还创建并传递了一个名为out的延迟操作。当_do_ calculation()完成计算时,我们将使用out回调返回值。在下一步中,我们对这个延迟操作执行了yield处理,并为价格设置了新值,最后返回Item。

在_do_ calculation()中,注意到有一个简单的计算——价格自增1,然后是100毫秒的睡眠。这是非常多的时间,如果在reactor线程中调用,它将使我们每秒处理的页数无法超过10页。通过使其在其他线程中运行,就不再有这个问题了。任务将会在线程池中排队,等待出现可用的线程,一旦进入线程执行,该线程就将睡眠100毫秒。最后一步是触发out回调。正常情况下,可以使用out.callback(new_price),不过由于现在处于另一个线程中,这种方法不再安全。如果这样做,会导致延迟操作的代码和Scrapy的功能会从另一个线程调用,迟早会出现错误的数据。替代方案是使用reactor.callFromThread(),同样,也是将函数作为参数,并将任意数量的额外参数传到函数中。该函数将会排队,由reactor线程调用;而另一方面,会解除process_item()对象yield操作的阻塞,为该Item恢复Scrapy操作。

如果有全局状态(比如计数器、移动平均值等)的话,那么在_do_calculation()中使用它们会发生什么呢?例如,我们添加两个变量——beta和delta,如下所示。

class UsingBlocking(object):
  def __init__(self):
    self.beta, self.delta = 0, 0
  ...
  def _do_calculation(self, price, out):
    self.beta += 1
    time.sleep(0.001)
    self.delta += 1
    new_price = price + self.beta - self.delta + 1
    assert abs(new_price-price-1) < 0.01

    time.sleep(0.10)...

上面的代码存在问题,我们会得到断言错误。这是因为如果一个线程在self.beta和self.delta之间切换,而另一个线程使用这些beta/delta的值恢复计算价格,那么会发现它们处于不一致的状态(beta比delta大),因此,会计算出错误的结果。短暂的睡眠使该问题更容易产生,不过即便没有它,竞态条件也将很快出现。为了避免此类问题发生,必须使用锁,比如使用Python的threading.RLock()递归锁。当使用锁时,我们可以确信不会存在两个线程同时执行其保护的临界区的情况。

class UsingBlocking(object):
  def __init__(self):
    ...
    self.lock = threading.RLock()
  ...
  def _do_calculation(self, price, out):
    with self.lock:
      self.beta += 1
      ...
      new_price = price + self.beta - self.delta + 1

    assert abs(new_price-price-1) < 0.01 ...

前面的代码现在是正确的。请记住我们并不需要保护整段代码,只需覆盖全局状态的使用就够了。

本示例的完整代码位于ch09/properties/p``roperties/pipelines/computation.py文件中。

要想使用该管道,只需在settings.py文件中将其添加到ITEM_PIPELINES设置即可,如下所示。

ITEM_PIPELINES = { ...
  'properties.pipelines.computation.UsingBlocking': 500,

可以按照平时那样运行该爬虫。按照预期,管道延时显著增长了100毫秒,不过我们惊喜地发现吞吐量几乎保持不变,即每秒25个item左右。

9.4.2 使用二进制或脚本的管道

对于一个遗留功能来说,最不可知的接口就是独立的可执行程序或脚本。它可能需要几秒钟时间启动(比如从数据库中加载数据),不过在这之后,它可能会在一小段延时内处理许多值。即使对于这种情况,Twisted仍然能够覆盖。我们可以使用reactor.spawnProcess()API以及相关的protocol.ProcessProtocol运行任何类型的可执行程序。来看一个例子,该示例的脚本如下所示。

#!/bin/bash
trap "" SIGINT
sleep 3

while read line
do
  # 4 per second
  sleep 0.25
  awk "BEGIN {print 1.20 * $line}"
done

这是一个简单的bash脚本。当它启动后,会禁用Ctrl + C。这是为了解决Ctrl + C派生到子进程后过早终止,导致Scrapy自身无法停止,无限等待子进程返回结果的系统特性。禁用Ctrl + C后,脚本将会睡眠3秒钟,以模拟启动时间。然后脚本会从输入中读取行,等待250毫秒,再返回结果价格,该计算使用Linux的awk命令将原值乘以1.2倍。该脚本的最大吞吐量是每秒4个Item。可以使用一个简短的会话对其进行测试,如下所示。

$ properties/pipelines/legacy.sh
12 <- If you type this quickly you will wait ~3 seconds to get results
14.40
13 <- For further numbers you will notice just a slight delay
15.60

由于Ctrl + C被禁用,我们必须使用Ctrl + D终止会话。不错!那么,我们要如何在Scrapy中使用该脚本呢?仍然从一个简化的版本起步。

class CommandSlot(protocol.ProcessProtocol):
  def __init__(self, args):
    self._queue = []
    reactor.spawnProcess(self, args[0], args)

  def legacy_calculate(self, price):
    d = defer.Deferred()
    self._queue.append(d)
    self.transport.write("%f\n" % price)
    return d

  # Overriding from protocol.ProcessProtocol
  def outReceived(self, data):
    """Called when new output is received"""
    self._queue.pop(0).callback(float(data))

class Pricing(object):
  def __init__(self):
    self.slot = CommandSlot(['properties/pipelines/legacy.sh'])

  @defer.inlineCallbacks
  def process_item(self, item, spider):
    item["price"][0] = yield self.slot.legacy_calculate(item["price"][0])
    defer.returnValue(item)

我们可以在这里找到名为CommandSlot的ProcessProtocol的定义,以及Pricing爬虫。在__init__()中,我们创建了新的CommandSlot,其构造方法初始化了一个空队列,并使用reactor.spawnProcess()启动了一个新的进程。该调用将从进程中传输和接收数据的ProcessProtocol作为第一个参数。在本例中,该值为self,因为spawnProcess()是在protocol类中进行调用的。第二个参数是可执行程序的名称。第三个参数args将该二进制程序的所有命令行参数作为字符串列表保留。

在管道的process_item()中,基本上将所有工作都委托给CommandSlot的legacy_calculate()方法,它将返回一个延迟操作,并执行yield操作。legacy_calculate()创建了一个延迟操作,使其排队,然后使用transport.write()将价格写入到进程当中。transport由ProcessProtocol提供,用于让我们和进程进行通信。无论我们何时从进程中接收到数据,都会调用outReceived()。通过延迟操作排队,以及按顺序处理的shell脚本,我们可以从队列中只弹出最旧的延迟操作,使用接收到的值触发它。到此为止。我们可以通过在ITEM_PIPELINES中添加它的方式,启动该管道,并像平时那样运行。

ITEM_PIPELINES = {...
  'properties.pipelines.legacy.Pricing': 600,

如果我们运行一次,就会发现其性能非常糟糕。如我们所料,我们的处理成为瓶颈,限制了吞吐量只能达到每秒4个Item。要想增长吞吐量,我们所能做的就是对管道进行一些修改,允许该类并行运行多个,如下所示。

class Pricing(object):
  def __init__(self):
    self.concurrency = 16
    args = ['properties/pipelines/legacy.sh']
    self.slots = [CommandSlot(args)
           for i in xrange(self.concurrency)]
    self.rr = 0

  @defer.inlineCallbacks
  def process_item(self, item, spider):
    slot = self.slots[self.rr]
    self.rr = (self.rr + 1) % self.concurrency
    item["price"][0] = yield
             slot.legacy_calculate(item["price"][0])
    defer.returnValue(item)

我们将其修改为启动16个实例,并以轮询的方式为每个实例发送价格。该管道现在提供了每秒16×4 = 64个item的吞吐量。我们可以通过一个快速爬取来确认,如下所示。

$ scrapy crawl easy -s CLOSESPIDER_ITEMCOUNT=1000
...
Scraped... 0.0 items/s, avg latency: 0.00 s and avg time in pipelines: 
0.00 s
Scraped... 21.0 items/s, avg latency: 2.20 s and avg time in pipelines: 
1.48 s
Scraped... 24.2 items/s, avg latency: 1.16 s and avg time in pipelines: 
0.52 s

延时和预期一样,增长到250毫秒,不过吞吐量仍然是每秒25个item。

请注意,前面的方法中使用了transport.write()将shell脚本输入中的所有价格排入队列。对于你的应用而言,这种方式可能合适,也可能不合适,尤其是当它使用了更多的数据而不仅仅是几个数字时。本例完整代码会将所有值和回调排入队列,并且只有在前一次结果被接收后,才会向脚本发送新值。你会发现这种方式对你的遗留应用更加友好,不过也增添了一些复杂度。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文