NSQ golang MQ 实时的分布式消息处理平台
关于本文
本文建立在通过解读 NSQ 源码 来提升 golang 的编写能力。
对于阅读某个项目的源码,本人通常按以下流程开展:
- 充分理解项目的使用场景和功能;
- 对比其在同类项目中的优缺点;
- 大体构思实现该项目需要什么模块;
- 阅读源码。
当然过程不是一成不变的,对项目的理解起初往往是从外界获取,在阅读源码的过程中会有一些自己的理解融汇进去;同时源码也不是一遍就可以看明白的,这个过程需要反复折回。
在本文中,分析对象是以 golang 为主编程语言而开发的 Message Queue:NSQ,通过上面提到的过程来进行剖析。网上已有不少 NSQ 的说明及代码解释文档,我会对其进行参考,也会将参考源地址做好引用。
what’s MQ
关于 MQ 的解释 google 就可以找出很多,根据使用场景和经历不同,对 MQ 的理解不同。我之前项目中使用 MQ 场景比较简单,就是接收日志,并且没有用到多节点拓扑功能。简单理解一下,应该是有一个 http 的 writer 的端,一个内部的 reader 端。客户端将日志上传到 writer 之后无需等待服务器处理,直接收到一个成功反馈,服务端在空闲时读取 reader 端,将队列中的日志进行处理的一种生产者-消费者模型。
对我来说,MQ 的主要作用是提供一个存放消息的 area,降低各组件间的耦合度。
但是完整的 MQ 需要 有多节点拓扑
、 内部+外部的 api+http 端口
、 实时及延迟消费机制
和 高通量读写稳定
等功能,另外消息的有序无需等特点则需要根据使用场景而定。
What‘s NSQ
NSQ 简介
NSQ 官方文档 中介绍 NSQ 的设计理念及 NSQ 特点。 极客学院的 wiki 中提供了中文文档。
NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。
NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。
NSQ 工具分为三大组件:
nsqd
nsqd 是一个守护进程,负责接收,排队,投递消息给客户端。它在 2 个 TCP 端口监听,一个给客户端,另一个是 HTTP API。同时,它也能在第三个端口监听 HTTPS。
nsqlookupd
nsqlookupd 是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者、 nsqd 节点广播的话题(topic)和通道(channel)信息。
有两个接口:nsqd 用来广播的 TCP 接口和客户端及 web 监控界面的 HTTP 接口。
nsqadmin
nsqadmin 是一套 WEB UI,用来汇集并监控显示集群的实时统计,对 topic 执行部分管理任务。
utilities
nsq 的周边工具,对 nsq 的流量管理、安全和功能的一些补充管理。
官方工具包括: nsq_stat
、 nsq_tail
、 nsq_to_file
、 nsq_to_http
、 nsq_to_nsq
和 to_nsq
等。
NSQ 关键字解析
Topic & channel
topic,话题,是一个独特的数据流,通道(channel) 是消费者订阅了某个话题的逻辑分组。
单个 nsqd 可以有很多的话题,每个话题可以有多通道。一个通道接收到一个话题中所有消息的副本,启用组播方式的传输,使消息同时在每个通道的所有订阅用户间分发,从而实现负载均衡。
SPOF(单点故障)
NSQ 中的 SPOF 指的是消息从生产者中接收到交付给消费者的这个流程中,任何一个单一环节出现故障。
NSQ 设计理念是 分布式
和 去中心化
的拓扑结构,保证信息将至少交付一次。根本原理是:
- 客户(消息接收体)表示他们已经准备好接收消息;
- NSQ 发送一条消息,并暂时将数据存储在本地做备份;
- 客户端回复 FIN(结束)或 REQ(重新排队)分别指示成功或失败。如果客户端没有回复, NSQ 会在设定的时间超时,自动重新排队该消息。
即交付消息的三种结果中,不会出现消息无缘丢失的事件。而丢失消息的情况只能是 nsqd 端出现问题(如非正常结束,且消息并未备份至磁盘)。因此,针对这个问题,增加 nsqd 的节点可以做一定程度的预防。
关于某个功能模块的特性,在模块分析部分再做详细剖析。
NSQ 使用体验
installing
NSQ 的 官方安装指南 ,建议使用 go get
命令,将 NSQ 项目源码和依赖库都下到本地方便阅读。
配置完成后,执行项目目录下的“./test.sh”脚本,可生成 NSQ 的二进制执行文件。
执行测试
服务端测试搭建
1.执行 NSQ 拓扑监控守护进程: nsqlookupd
nsqlookupd
默认情况下,执行如下命令行参数:
-http-address="0.0.0.0:4161": <addr>:<port>`监听 HTTP 客户端
-tcp-address="0.0.0.0:4160": TCP 客户端监听的 <addr>:<port>
其中,TCP 端用于监听 nsqd 挂载。
2.另起终端,执行 nsqd,并将其挂载到启动后的 nsqlookupd 监控端口上。
nsqd --lookupd-tcp-address=127.0.0.1:4160
注:nsqd 没有对 nsqlookupd 的 HTTP 端口挂载。
默认情况下,端口 4151
为 HTTP 监听端口, 4150
为 TCP 监听端口,向这两个端口发送消息,则会进入 NSQD 消息队列。
3.开第三个终端,启动 NSQ 状态监控的 WEB UI
nsqadmin --lookupd-http-address=127.0.0.1:4161
此时 web 监控 UI:nsqadmin 挂载的是 nsqlookupd 的 HTTP 端口地址。显示地址为 http://127.0.0.1:4171/
4.开第四个终端,通过 curl
命令执行信息发布。
curl -d "hello world 1" "http://127.0.0.1:4151/put?topic=test"
注:NSQ 的 topic 命名,是以该话题的第一个 PUB
或 SUB
来确定的,并将其创建在 nsqd 中。同时话题的元数据将会传播给 nsqlookupd 的配置。其他的 reader 将会通过周期性的查询 nsqlookupd 来发现这个话题。在该命令执行后,会在第二步中执行的 nsqd 中创建一个名为 test
的 topic。
curl -d
是将信息以 POST
的形式,发送到 4151 端口。
客户端(消费者)获取消息测试
简单的服务端架构已经搭建完成,此时打开监控界面可以看到刚刚发布的消息,一条名为“test”的 topic,点开后, message
和 channel
均为 1。
创建一个消费者来执行消息读取测试:
nsq_to_file --topic=test --output-dir=tmp --lookupd-http-address=127.0.0.1:4161
此时 nsq_to_file
便充当了消费者,将消息读取后,存放于目录 tmp
中的时间戳文件,打开文件会看到我们之前发送的消息体: hello world 1
。
web UI 中也会显示消费者客户端 nsq_to_file
和对应的接收的信息数及连接状态。
如果同时开启多个 nsq_to_file
,则只有一个 nsq_to_file
会接收到新的消息,因为消息一旦交付成功后将会失效,无法被其他消费者接收。
nsq_to_file 只是一个简单的消费者客户端,消息的订阅可以使用官方开发的 go lib 客户端: go-ndq 的 read 和 write 功能。
NSQ 代码阅读
NSQ 分三大模块:nsqd、nsqlookupd、nsqadmin,从哪个开始着手很重要。
本人阅读源码习惯从功能少的、尽量闭环的程序着手。纵观上面三个模块,最提纲挈领,最有桥梁作用的应该是 nsqlookupd
,但 nsqlookupd 仍然有很多的功能开发,在没有搞清楚其功能之前,不适合直接阅读。因此,我把第一个阅读的源码定为 nsq-to-file
。
nsq-to-file
注意:本地源码地址为: github.com/nsqio/nsq/apps/nsq_to_file
nsq-to-file 作为 NSQ 的一种客户端,其实是在 go-nsq 的基础上做的功能开发。
代码
两个文件: nsq_to_file.go
和 strftime.go
,均为 main
package。
nsq_to_file.go
先找到 main
函数
func main() {
cfg := nsq.NewConfig()
// TODO: remove, deprecated
flag.Var(&nsq.ConfigFlag{cfg}, "reader-opt", "(deprecated) use --consumer-opt")
flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/nsqio/go-nsq#Config)")
flag.Parse()
第一个函数调用 flag.Parse()
是 golang 标准库中的命令行参数解析,可以追出允许使用什么命令行参数。
nsq_to_file flag 参数功能解析
nsq_to_file 肚子 opt 参数设置:
var (
showVersion = flag.Bool("version", false, "print version string")
channel = flag.String("channel", "nsq_to_file", "nsq channel")
maxInFlight = flag.Int("max-in-flight", 200, "max number of messages to allow in flight")
outputDir = flag.String("output-dir", "/tmp", "directory to write output files to")
datetimeFormat = flag.String("datetime-format", "%Y-%m-%d_%H", "strftime compatible format for <DATETIME> in filename format")
filenameFormat = flag.String("filename-format", "<TOPIC>.<HOST><REV>.<DATETIME>.log", "output filename format (<TOPIC>, <HOST>, <PID>, <DATETIME>, <REV> are replaced. <REV> is increased when file already exists)")
hostIdentifier = flag.String("host-identifier", "", "value to output in log filename in place of hostname. <SHORT_HOST> and <HOSTNAME> are valid replacement tokens")
gzipLevel = flag.Int("gzip-level", 6, "gzip compression level (1-9, 1=BestSpeed, 9=BestCompression)")
gzipEnabled = flag.Bool("gzip", false, "gzip output files.")
skipEmptyFiles = flag.Bool("skip-empty-files", false, "Skip writing empty files")
topicPollRate = flag.Duration("topic-refresh", time.Minute, "how frequently the topic list should be refreshed")
topicPattern = flag.String("topic-pattern", ".*", "Only log topics matching the following pattern")
rotateSize = flag.Int64("rotate-size", 0, "rotate the file when it grows bigger than `rotate-size` bytes")
rotateInterval = flag.Duration("rotate-interval", 0*time.Second, "rotate the file every duration")
nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
topics = app.StringArray{}
// TODO: remove, deprecated
gzipCompression = flag.Int("gzip-compression", 3, "(deprecated) use --gzip-level, gzip compression level (1 = BestSpeed, 2 = BestCompression, 3 = DefaultCompression)")
)
consumer-opt
最近两个是 reader-opt
和 consumer-opt
,其实功能相同,建议使用 consumer-opt。
这个命令行功能是为 go-nsq 设置 config 参数,因为 nsq_to_file 是在其基础上开发出来的。而且参数说明中也说了,有可能已经被设置好了的。
以下是 config
结构体的定义(文件: github.com/nsqio/go-nsq/config.go):
type Config struct {
initialized bool
// used to Initialize, Validate
configHandlers []configHandler
DialTimeout time.Duration `opt:"dial_timeout" default:"1s"`
// Deadlines for network reads and writes
ReadTimeout time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"`
WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"`
// LocalAddr is the local address to use when dialing an nsqd.
// If empty, a local address is automatically chosen.
LocalAddr net.Addr `opt:"local_addr"`
// Duration between polling lookupd for new producers, and fractional jitter to add to
// the lookupd pool loop. this helps evenly distribute requests even if multiple consumers
// restart at the same time
//
// NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between
// reconnection attempts
LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"`
LookupdPollJitter float64 `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`
// Maximum duration when REQueueing (for doubling of deferred requeue)
MaxRequeueDelay time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"`
// Backoff strategy, defaults to exponential backoff. Overwrite this to define alternative backoff algrithms.
BackoffStrategy BackoffStrategy `opt:"backoff_strategy" default:"exponential"`
// Maximum amount of time to backoff when processing fails 0 == no backoff
MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`
// Unit of time for calculating consumer backoff
BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`
// Maximum number of times this consumer will attempt to process a message before giving up
MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`
// Duration to wait for a message from a producer when in a state where RDY
// counts are re-distributed (ie. max_in_flight < num_producers)
LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`
// Duration between redistributing max-in-flight to connections
RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"`
// Identifiers sent to nsqd representing this client
// UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
ClientID string `opt:"client_id"` // (defaults: short hostname)
Hostname string `opt:"hostname"`
UserAgent string `opt:"user_agent"`
// Duration of time between heartbeats. This must be less than ReadTimeout
HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"`
// Integer percentage to sample the channel (requires nsqd 0.2.25+)
SampleRate int32 `opt:"sample_rate" min:"0" max:"99"`
// To set TLS config, use the following options:
//
// tls_v1 - Bool enable TLS negotiation
// tls_root_ca_file - String path to file containing root CA
// tls_insecure_skip_verify - Bool indicates whether this client should verify server certificates
// tls_cert - String path to file containing public key for certificate
// tls_key - String path to file containing private key for certificate
// tls_min_version - String indicating the minimum version of tls acceptable ('ssl3.0', 'tls1.0', 'tls1.1', 'tls1.2')
//
TlsV1 bool `opt:"tls_v1"`
TlsConfig *tls.Config `opt:"tls_config"`
// Compression Settings
Deflate bool `opt:"deflate"`
DeflateLevel int `opt:"deflate_level" min:"1" max:"9" default:"6"`
Snappy bool `opt:"snappy"`
// Size of the buffer (in bytes) used by nsqd for buffering writes to this connection
OutputBufferSize int64 `opt:"output_buffer_size" default:"16384"`
// Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
//
// WARNING: configuring clients with an extremely low
// (< 25ms) output_buffer_timeout has a significant effect
// on nsqd CPU usage (particularly with > 50 clients connected).
OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms"`
// Maximum number of messages to allow in flight (concurrency knob)
MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"`
// The server-side message timeout for messages delivered to this client
MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`
// secret for nsqd authentication (requires nsqd 0.2.29+)
AuthSecret string `opt:"auth_secret"`
}
从这里看出,使用 golang 默认标识的方式,已经设置好了参数的 key(即 opt
后的值)和 default 值了。暂且不看各功能的含义(太多了),我们先看如何设置。
原文解释:
The only valid way to create a Config is via NewConfig, using a struct literal will panic. After Config is passed into a high-level type (like Consumer, Producer, etc.) the values are no longer mutable (they are copied).
Use Set(option string, value interface{}) as an alternate way to set parameters.
Set 的代码:
// Set takes a comma separated value and follows the rules in Config.Set
// using the first field as the option key, and the second (if present) as the value
func (c *ConfigFlag) Set(opt string) (err error) {
parts := strings.SplitN(opt, ",", 2)
key := parts[0]
switch len(parts) {
case 1:
// default options specified without a value to boolean true
err = c.Config.Set(key, true)
case 2:
err = c.Config.Set(key, parts[1])
}
return
}
可以看出,对命令行的要求是“key,value”,即使用“,”分隔。
如设置 timeout 的启动命令:
./nsq_to_file --lookupd-http-address=127.0.0.1:4161 --consumer-opt="read_timeout,100s"
继续 main 函数代码阅读
channel
if *showVersion { //显示版本信息
fmt.Printf("nsq_to_file v%s\n", version.Binary)
return
}
if *channel == "" { //要求必须设置 channel 名称
log.Fatal("--channel is required")
}
showVersion
没什么好说的,关键是 channel
opt。
通过之前的准备工作,了解到 NSQ 消息分发是无单点的分布式拓扑结构,其中 Topic 和 channel 是其重要的组成单元,一个 Topic 下的信息会同时发送给不同的 channel 中,而同一个 channel 下的多个消费者中,只有一个可以获得此 channel 下的消息。
做一个测试:
再开启一个消费者终端,与之前不同的是,此终端监听 Channel 为"test-channel":
nsq_to_file --lookupd-http-address=127.0.0.1:4161 --channel="test-channel" --output-dir=tmp
向 NSQ-http 端口发送消息:
curl -d "hello world test5" "http://127.0.0.1:4151/put?topic=test"
可以看到输出文件中,有两行 hello world test5
,这是因为 channel nsq_to_file
(默认 channel)和 test-channel
中都会有此条消息,而虽然 channel nsq_to_file
中有三个消费者在等待接收,但只有一个消费者获取到,其余继续等待。 test-channel
中的消费者获取信息后,也会写到/tmp 目录下的文件中,因此两条消息均会被记录。
明白 channel 的作用后,在继续阅读 channel 的使用代码之前,将 main 后面几个同类型 opt 判定也看完:
var topicsFromNSQLookupd bool //判断消息来源是否为 NSQ-LOOKUPD
if len(nsqdTCPAddrs) == 0 && len(lookupdHTTPAddrs) == 0 {
log.Fatal("--nsqd-tcp-address or --lookupd-http-address required.")
}
if len(nsqdTCPAddrs) != 0 && len(lookupdHTTPAddrs) != 0 {
log.Fatal("use --nsqd-tcp-address or --lookupd-http-address not both")
}
//要求必须从 TCP 端口与 HTTP 端口中必须且只能监听其中一个,不可同时监听,也不可都不监听
if *gzipLevel < 1 || *gzipLevel > 9 {
log.Fatalf("invalid --gzip-level value (%d), should be 1-9", *gzipLevel)
}
// TODO: remove, deprecated
if hasArg("gzip-compression") {
log.Printf("WARNING: --gzip-compression is deprecated in favor of --gzip-level")
switch *gzipCompression {
case 1:
*gzipLevel = gzip.BestSpeed
case 2:
*gzipLevel = gzip.BestCompression
case 3:
*gzipLevel = gzip.DefaultCompression
default:
log.Fatalf("invalid --gzip-compression value (%d), should be 1,2,3", *gzipCompression)
}
} //根据之前设置的压缩等级进行压缩
cfg.UserAgent = fmt.Sprintf("nsq_to_file/%s go-nsq/%s", version.Binary, nsq.VERSION)
cfg.MaxInFlight = *maxInFlight // 允许缓存的消息条数上限
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
上一篇: golang 处理 Linux 系统信号
下一篇: MyBatis 介绍和使用
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论