- 内容提要
- 前言
- 作者简介
- 封面简介
- 第1章 理解高性能 Python
- 第2章 通过性能分析找到瓶颈
- 2.1 高效地分析性能
- 2.2 Julia 集合的介绍
- 2.3 计算完整的 Julia 集合
- 2.4 计时的简单方法——打印和修饰
- 2.5 用 UNIX 的 time 命令进行简单的计时
- 2.6 使用 cProfile 模块
- 2.7 用 runsnakerun 对 cProfile 的输出进行可视化
- 2.8 用 line_profiler 进行逐行分析
- 2.9 用 memory_profiler 诊断内存的用量
- 2.10 用 heapy 调查堆上的对象
- 2.11 用 dowser 实时画出变量的实例
- 2.12 用 dis 模块检查 CPython 字节码
- 2.13 在优化期间进行单元测试保持代码的正确性
- 2.14 确保性能分析成功的策略
- 2.15 小结
- 第3章 列表和元组
- 第4章 字典和集合
- 第5章 迭代器和生成器
- 第6章 矩阵和矢量计算
- 第7章 编译成 C
- 第8章 并发
- 第9章 multiprocessing 模块
- 第10章 集群和工作队列
- 第11章 使用更少的 RAM
- 第12章 现场教训
10.7 为鲁棒生产集群的 NSQ
在生产环境中,你需要远比我们所谈到的其他解决方案更健壮的解决方案。这是因为在你的集群每天运营期间,节点可能变得不可用,代码可能会崩溃掉,网络可能会断线,或者在其他数以千计会发生的问题中,其中的一个就可能发生了。问题就在于以前所有的系统有一台计算机来发布命令,还有有限和静态数量的计算机来读取并执行命令。我们宁愿用一个使用消息总线的多角色(actor)的系统来取而代之——这将允许我们具有数量任意和经常变化的消息创建者和消息消费者。
针对这些问题的一个简单解决方案就是NSQ,一个高性能的分布式消息平台。尽管它是用GO编写的,但它是完全与数据格式和语言无关的。结果就是有很多语言写成的库,访问NSQ的基本接口是只需要能够创建HTTP调用的REST API。而且,我们能够用想要的任何格式来传送消息:JSON、Pickel、msgpack等。无论如何,最重要的是,它提供了关于消息递送的基本保证,并且它使用了两个简单的设计模式来做好了一切:队列和发布者/订阅者模式。
10.7.1 队列
队列是一种消息的缓存类型。无论何时,当你想把消息传送给处理管道的另一端时,你把它发送到队列,它会在队列里等待直到有可用的工作者来读取它。当生产和消费之间存在不平衡时,队列在分布式处理中是最有用的。如果发生了不平衡,我们仅仅通过添加更多的数据消费者即可水平扩展,直到消息生产的速率等于消费的速率。另外,如果负责消费消息的计算机下线了,消息不会丢失,只是在队列中排队,直到出现可用的消费者,这样就给了我们消息递送的保证。
例如,假设我们想要在用户每次给我们站点的商品评分的时候,给用户处理新的推荐。如果我们没有队列,那么“评分”的行为会直接调用“重新计算推荐”的行为,而不管服务器正拼命忙于处理推荐。如果突然间数以千计的用户决定给某件商品评分,我们的推荐服务器就可能会疲于应付这些请求,它们就可能会开始超时,丢弃消息,通常变得失去响应!
另一方面,当任务准备好时,推荐服务器使用队列来请求更多的任务。一个新的“评分”行为会把一个新任务放入队列,当推荐服务器准备做更多工作时,它会从队列中抓取任务来处理。在这个设定中,如果比平常更多的用户开始给商品评分,我们的队列将会塞满,对于推荐服务器来说它的行为就像是一个缓存——它们的工作负载将不受影响,它们还会处理消息,直到队列变空。
随之而来的一个潜在的问题就是如果队列完全被任务搞得不堪重负,它将会存储相当多的消息。NSQ通过多个存储后端来解决这个问题——当没有许多消息时,它们保存在内存中;当更多的消息开始进来时,把消息放置进磁盘中。
备忘
一般来说,当使用队列系统来工作时,设法让下流的系统(例如,前面例子中的推荐系统)处于正常工作负载60%的容量是一个好主意。在给问题分配足够多的资源与当工作量增加到超出正常水平时给你的服务器充足的额外能力之间进行权衡,这是一个良好的妥协。
10.7.2 发布者/订阅者
另一方面,pub/sub(发布者/订阅者的简称)描述了谁来得到哪些消息。数据发布者能够推送关于特定主题的数据,而数据订阅者注册不同的数据源。无论发布者何时发放信息,它都发送给所有的订阅者——它们各自得到原始信息的一份完全相同的拷贝。你可以把它想象成报纸:许多人能够订阅特定的报纸,无论新版本的报纸何时出来,每一个订阅者都得到一份完全相同的拷贝。另外,报纸的生产者完全不需要知道报纸要发送给的人群。结果就是,发布者和订阅者相互解耦了,当我们的网络发生变化,还处于生产环境中时,让我们的系统变得更健壮。
除此之外,NSQ增加了数据消费者的概念,那就是,多个进程能够连接到相同的数据发布。无论新的数据何时出来,订阅者都得到一份数据拷贝。无论怎样,每个订阅只有一个消费者看到了数据。在与报纸的类比中,你可以把它想象成让多名阅读报纸的人处于相同的家庭中。发布者将把一份报纸递送到家中,既然家庭只订阅了一次,在家中谁先拿到报纸谁就可以阅读数据。当每一个发布者的消费者看到消息时,对消息做相同的处理。无论如何,它们可以悄悄地在多台计算机上,这样就更增强了整个计算池的处理能力。
我们在图10-1中可以看到对发布者/处理者模式的描述。如果一条关于“点击”主题的新消息发布出来了,所有的订阅者(或者,用NSQ的术语来说,就是通道——例如,“指标”、“作弊分析”,以及“打包”)将得到一份拷贝。每个订阅者由一个或多个消费者所组成,代表对响应消息的实际处理。在“指标”订阅者的情况下,只有一个消费者会看到新消息。下一条消息将到另一个消费者那去,依次类推。
在潜在的大规模的消费者池中传播消息的好处就是实质上做自动的负载均衡。如果一条消息要花费很长的时间来处理,消费者直到处理完成后,才会发信号给NSQ表示自己已准备好接受更多的消息,这样其他消费者将获得以后的大部分消息(直到原来的消费者做好了再次处理的准备)。另外,它允许已经存在的消费者断开连接(无论是自主选择还是因为失效),还允许新的消费者连接到集群,然而又保持了在特定订阅组中的处理能力。例如,如果我们发现“指标”要花费相当长的时间来处理,而且常常跟不上需求,我们就能够仅仅为订阅组添加更多的进程给消费者池,以便给予我们更多的处理能力。另一方面,如果我们看到大多数进程处于空闲(例如,没有得到任何消息),我们能够轻易地从这个订阅组中移除消费者。
图10-1 NSQ的发布者/订阅者拓扑图
注意到无论是谁都能发布数据也很重要。消费者不仅仅一定是消费者——它可以从一个主题消费数据,接着发布另一个主题。事实上,当涉及分布式计算这种范式时,这条链是一个重要的工作流。消费者读取一个主题的数据,以某种方式转换数据,接着发布关于一个新主题的数据,而其他消费者能够进一步转换它。凭借这种方式,不同的主题代表不同的数据,订阅组代表对数据的不同转换,而消费者就是转换个体消息的实际工作者。
而且,在这个系统中存在极大的冗余。可以有许多nsqd进程让每个消费者连接上,可以有许多消费者连接到一个特定的订阅上。这样就没有单点失效问题,即使几台机器下线了,你的系统还是鲁棒的。我们可以看到在图10-2中,即使图表中的一台计算机下线了,系统还是能够投递和处理消息。另外,既然NSQ在关闭时把挂起的消息存储到了磁盘中,除非硬件失效是致命的灾难,否则你的数据还是非常有可能被完好无缺地投递。最后,如果消费者在响应一条特定的消息前关机了,NSQ将会把消息重新发送给另外一个消费者。这意味着即使有多个消费者关机了,我们知道一个主题的所有消息将至少得到一次响应[1]。
图10-2 NSQ的连接拓扑
10.7.3 分布式素数计算器
使用NSQ的代码一般是异步[3]。
就如我们之前所说的,像这样来做CPU密集型的工作有很多好处。首先,我们得到了完全鲁棒性的保证,这可能对这个项目有用,也可能无用。无论如何,最重要的是,我们得到了自动的负载均衡。这意味着如果一个消费者得到了一个花费特别长时间处理的数字,另一个消费者就会上手。
我们通过创建一个具有主题和所声明的订阅组(可以在例10-10的末尾看到)的nsq.Reader对象来创建一个消费者。我们也必须声明运行nsqd实例的位置(或者nsqlookupd实例,我们在本节中不会接触到)。另外,我们声明一个handler,这只是一个函数,对来自主题的每一条消息,它都会被调用到。为创建一个生产者,我们创建了一个nsq.Writer对象,并声明了一个或更多的nsqd实例要写入的位置。这样就给了我们异步写入nsq的能力,只要声明主题名字和消息即可[4]。
例10-10 使用NSQ的分布式素数计算
import nsq from tornado import gen from functools import partial import ujson as json @gen.coroutine def write_message(topic, data, writer): response = yield gen.Task(writer.pub, topic, data) # ❶ if isinstance(response, nsq.Error): print "Error with Message: {}: {}".format(data, response) yield write_message(data, writer) else: print "Published Message: ", data def calculate_prime(message, writer): message.enable_async() # ❷ data = json.loads(message.body) prime = is_prime(data["number"]) data["prime"] = prime if prime: topic = 'primes' else: topic = 'non_primes' output_message = json.dumps(data) write_message(topic, output_message, writer) message.finish() # ❸ if __name__ == "__main__": writer = nsq.Writer(['127.0.0.1:4150', ]) handler = partial(calculate_prime, writer=writer) reader = nsq.Reader( message_handler = handler, nsqd_tcp_addresses = ['127.0.0.1:4150', ], topic = 'numbers', channel = 'worker_group_a', ) nsq.run()
❶ 我们将异步地把结果写入一个新主题,如果因某种原因失败了就重新写。
❷ 通过在消息上使async生效,我们能够在处理消息时执行异步的操作。
❸ 使用async-enabled消息,我们在处理完消息时必须给NSQ发信号。
为了设置NSQ生态系统,我们将在本地机器上启动一个nsqd的实例:
$ nsqd 2014/05/10 16:48:42 nsqd v0.2.27 (built w/go1.2.1) 2014/05/10 16:48:42 worker id 382 2014/05/10 16:48:42 NSQ: persisting topic/channel metadata to nsqd.382.dat 2014/05/10 16:48:42 TCP: listening on [::]:4150 2014/05/10 16:48:42 HTTP: listening on [::]:4151
现在,我们可以启动我们所想要的数量的Python代码的实例(例10-10)。事实上,我们可以让这些实例运行在其他计算机上,只要在nsq.Reader实例化中对nsqd_tcp_address的引用还是合法的。这些消费者将连接到nsqd,并等待关于numbers主题的消息被发布出来。
数据发布到numbers主题上有很多种办法。既然了解到掌握一个系统的方式要经历一段长久的过程才理解怎样合适地处理它,我们将使用命令行工具来做。我们可以仅仅使用HTTP接口来给主题发布消息:
$ for i in `seq 10000` > do > echo {\"number\": $i} | curl -d@- "http://127.0.0.1:4151/pub?topic=numbers" > done
当这个命令开始运行时,我们以不同的数字在其中发布消息给numbers主题。同时,所有的生产者将开始输出状态消息,表明它们已经看见并处理了消息。另外,这些数字或发布给primes主题,或发布给non_primes主题。这允许我们有其他的数据消费者连接到这些主题中的任意一个来得到一个过滤我们原始数据的子集。例如,一个只需要素数的应用可以只连接到primes主题而总是有新的素数来为它自己的运算。我们可以通过使用nsqd的statsHTTP端点来观看我们运算的状态:
$ curl "http://127.0.0.1:4151/stats" nsqd v0.2.27 (built w/go1.2.1) [numbers ] depth: 0 be-depth: 0 msgs: 3060 e2e%: [worker_group_a ] depth: 1785 be-depth: 0 inflt: 1 def: 0 re-q: 0 timeout: 0 msgs: 3060 e2e%: [V2 muon:55915 ] state: 3 inflt: 1 rdy: 0 fin: 1469 re-q: 0 msgs: 1469 connected: 24s [primes ] depth: 195 be-depth: 0 msgs: 1274 e2e%: [non_primes ] depth: 1274 be-depth: 0 msgs: 1274 e2e%:
我们在这儿能看到numbers主题有一个订阅组work_group_a和一个消费者。另外,订阅组有一个长达1785条消息的大纵深,这意味着我们把消息放入NSQ的速度超过我们能够处理的速度。这个迹象表明要增加更多的消费者,这样我们就有更多的处理能力来应对更多的消息。而且,我们可以看到这个特定的消费者已经连接了24秒,已经处理了1469条消息,并且当前还有1条消息正在处理中。这个状态端点给出了大量信息来调试你的NSQ设置!最后,我们来看看primes和non_primes主题,它们没有订阅者或者消费者。这意味着消息将会存储起来直到有一个订阅者过来请求数据。
备忘
在生产系统中,你甚至能使用更加强大的工具nsqadmin,它提供了一个web接口,具有很详细的关于所有主题/订阅者以及消费者的概况。另外,它允许你轻易地暂停以及删除订阅者和主题。
为了实际观看这些消息,我们将为primes主题(或者non_primes)创建一个新消费者,只是把结果打包进一个文件或者数据库。或者,我们可以使用nsq_tail工具来看看数据包含了哪些内容:
$ nsq_tail --topic primes --nsqd-tcp-address=127.0.0.1:4150 2014/05/10 17:05:33 starting Handler go-routine 2014/05/10 17:05:33 [127.0.0.1:4150] connecting to nsqd 2014/05/10 17:05:33 [127.0.0.1:4150] IDENTIFY response: {MaxRdyCount:2500 TLSv1:false Deflate:false Snappy:false} {"prime":true,"number":5} {"prime":true,"number":7} {"prime":true,"number":11} {"prime":true,"number":13} {"prime":true,"number":17}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论