NSQ golang MQ 实时的分布式消息处理平台

发布于 2024-08-26 06:14:08 字数 16500 浏览 24 评论 0

关于本文

本文建立在通过解读 NSQ 源码 来提升 golang 的编写能力。

对于阅读某个项目的源码,本人通常按以下流程开展:

  • 充分理解项目的使用场景和功能;
  • 对比其在同类项目中的优缺点;
  • 大体构思实现该项目需要什么模块;
  • 阅读源码。

当然过程不是一成不变的,对项目的理解起初往往是从外界获取,在阅读源码的过程中会有一些自己的理解融汇进去;同时源码也不是一遍就可以看明白的,这个过程需要反复折回。

在本文中,分析对象是以 golang 为主编程语言而开发的 Message Queue:NSQ,通过上面提到的过程来进行剖析。网上已有不少 NSQ 的说明及代码解释文档,我会对其进行参考,也会将参考源地址做好引用。

what’s MQ

关于 MQ 的解释 google 就可以找出很多,根据使用场景和经历不同,对 MQ 的理解不同。我之前项目中使用 MQ 场景比较简单,就是接收日志,并且没有用到多节点拓扑功能。简单理解一下,应该是有一个 http 的 writer 的端,一个内部的 reader 端。客户端将日志上传到 writer 之后无需等待服务器处理,直接收到一个成功反馈,服务端在空闲时读取 reader 端,将队列中的日志进行处理的一种生产者-消费者模型。

对我来说,MQ 的主要作用是提供一个存放消息的 area,降低各组件间的耦合度。

但是完整的 MQ 需要 有多节点拓扑内部+外部的 api+http 端口实时及延迟消费机制高通量读写稳定 等功能,另外消息的有序无需等特点则需要根据使用场景而定。

What‘s NSQ

NSQ 简介

NSQ 官方文档 中介绍 NSQ 的设计理念及 NSQ 特点。 极客学院的 wiki 中提供了中文文档。

NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。

NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。

NSQ 工具分为三大组件:

nsqd

nsqd 是一个守护进程,负责接收,排队,投递消息给客户端。它在 2 个 TCP 端口监听,一个给客户端,另一个是 HTTP API。同时,它也能在第三个端口监听 HTTPS。

nsqlookupd

nsqlookupd 是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者、 nsqd 节点广播的话题(topic)和通道(channel)信息。

有两个接口:nsqd 用来广播的 TCP 接口和客户端及 web 监控界面的 HTTP 接口。

nsqadmin

nsqadmin 是一套 WEB UI,用来汇集并监控显示集群的实时统计,对 topic 执行部分管理任务。

utilities

nsq 的周边工具,对 nsq 的流量管理、安全和功能的一些补充管理。
官方工具包括: nsq_statnsq_tailnsq_to_filensq_to_httpnsq_to_nsqto_nsq 等。

NSQ 关键字解析

Topic & channel

topic,话题,是一个独特的数据流,通道(channel) 是消费者订阅了某个话题的逻辑分组。

单个 nsqd 可以有很多的话题,每个话题可以有多通道。一个通道接收到一个话题中所有消息的副本,启用组播方式的传输,使消息同时在每个通道的所有订阅用户间分发,从而实现负载均衡。

SPOF(单点故障)

NSQ 中的 SPOF 指的是消息从生产者中接收到交付给消费者的这个流程中,任何一个单一环节出现故障。

NSQ 设计理念是 分布式去中心化 的拓扑结构,保证信息将至少交付一次。根本原理是:

  • 客户(消息接收体)表示他们已经准备好接收消息;
  • NSQ 发送一条消息,并暂时将数据存储在本地做备份;
  • 客户端回复 FIN(结束)或 REQ(重新排队)分别指示成功或失败。如果客户端没有回复, NSQ 会在设定的时间超时,自动重新排队该消息。

即交付消息的三种结果中,不会出现消息无缘丢失的事件。而丢失消息的情况只能是 nsqd 端出现问题(如非正常结束,且消息并未备份至磁盘)。因此,针对这个问题,增加 nsqd 的节点可以做一定程度的预防。

关于某个功能模块的特性,在模块分析部分再做详细剖析。

NSQ 使用体验

installing

NSQ 的 官方安装指南 ,建议使用 go get 命令,将 NSQ 项目源码和依赖库都下到本地方便阅读。

配置完成后,执行项目目录下的“./test.sh”脚本,可生成 NSQ 的二进制执行文件。

执行测试

服务端测试搭建

1.执行 NSQ 拓扑监控守护进程: nsqlookupd

nsqlookupd

默认情况下,执行如下命令行参数:

-http-address="0.0.0.0:4161": <addr>:<port>`监听 HTTP 客户端
-tcp-address="0.0.0.0:4160": TCP 客户端监听的 <addr>:<port> 

其中,TCP 端用于监听 nsqd 挂载。

2.另起终端,执行 nsqd,并将其挂载到启动后的 nsqlookupd 监控端口上。

nsqd --lookupd-tcp-address=127.0.0.1:4160

注:nsqd 没有对 nsqlookupd 的 HTTP 端口挂载。

默认情况下,端口 4151 为 HTTP 监听端口, 4150 为 TCP 监听端口,向这两个端口发送消息,则会进入 NSQD 消息队列。

3.开第三个终端,启动 NSQ 状态监控的 WEB UI

nsqadmin --lookupd-http-address=127.0.0.1:4161

此时 web 监控 UI:nsqadmin 挂载的是 nsqlookupd 的 HTTP 端口地址。显示地址为 http://127.0.0.1:4171/

4.开第四个终端,通过 curl 命令执行信息发布。

curl -d "hello world 1" "http://127.0.0.1:4151/put?topic=test"

注:NSQ 的 topic 命名,是以该话题的第一个 PUBSUB 来确定的,并将其创建在 nsqd 中。同时话题的元数据将会传播给 nsqlookupd 的配置。其他的 reader 将会通过周期性的查询 nsqlookupd 来发现这个话题。在该命令执行后,会在第二步中执行的 nsqd 中创建一个名为 test 的 topic。

curl -d 是将信息以 POST 的形式,发送到 4151 端口。

客户端(消费者)获取消息测试

简单的服务端架构已经搭建完成,此时打开监控界面可以看到刚刚发布的消息,一条名为“test”的 topic,点开后, messagechannel 均为 1。

创建一个消费者来执行消息读取测试:

nsq_to_file --topic=test --output-dir=tmp --lookupd-http-address=127.0.0.1:4161

此时 nsq_to_file 便充当了消费者,将消息读取后,存放于目录 tmp 中的时间戳文件,打开文件会看到我们之前发送的消息体: hello world 1

web UI 中也会显示消费者客户端 nsq_to_file 和对应的接收的信息数及连接状态。

如果同时开启多个 nsq_to_file ,则只有一个 nsq_to_file 会接收到新的消息,因为消息一旦交付成功后将会失效,无法被其他消费者接收。

nsq_to_file 只是一个简单的消费者客户端,消息的订阅可以使用官方开发的 go lib 客户端: go-ndq 的 read 和 write 功能。

NSQ 代码阅读

NSQ 分三大模块:nsqd、nsqlookupd、nsqadmin,从哪个开始着手很重要。

本人阅读源码习惯从功能少的、尽量闭环的程序着手。纵观上面三个模块,最提纲挈领,最有桥梁作用的应该是 nsqlookupd ,但 nsqlookupd 仍然有很多的功能开发,在没有搞清楚其功能之前,不适合直接阅读。因此,我把第一个阅读的源码定为 nsq-to-file

nsq-to-file

注意:本地源码地址为: github.com/nsqio/nsq/apps/nsq_to_file

nsq-to-file 作为 NSQ 的一种客户端,其实是在 go-nsq 的基础上做的功能开发。

代码

两个文件: nsq_to_file.gostrftime.go ,均为 main package。

nsq_to_file.go

先找到 main 函数

func main() {
	cfg := nsq.NewConfig()

	// TODO: remove, deprecated
	flag.Var(&nsq.ConfigFlag{cfg}, "reader-opt", "(deprecated) use --consumer-opt")
	flag.Var(&nsq.ConfigFlag{cfg}, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/nsqio/go-nsq#Config)")

	flag.Parse()

第一个函数调用 flag.Parse() 是 golang 标准库中的命令行参数解析,可以追出允许使用什么命令行参数。

nsq_to_file flag 参数功能解析

nsq_to_file 肚子 opt 参数设置:

var (
	showVersion = flag.Bool("version", false, "print version string")

	channel     = flag.String("channel", "nsq_to_file", "nsq channel")
	maxInFlight = flag.Int("max-in-flight", 200, "max number of messages to allow in flight")

	outputDir      = flag.String("output-dir", "/tmp", "directory to write output files to")
	datetimeFormat = flag.String("datetime-format", "%Y-%m-%d_%H", "strftime compatible format for <DATETIME> in filename format")
	filenameFormat = flag.String("filename-format", "<TOPIC>.<HOST><REV>.<DATETIME>.log", "output filename format (<TOPIC>, <HOST>, <PID>, <DATETIME>, <REV> are replaced. <REV> is increased when file already exists)")
	hostIdentifier = flag.String("host-identifier", "", "value to output in log filename in place of hostname. <SHORT_HOST> and <HOSTNAME> are valid replacement tokens")
	gzipLevel      = flag.Int("gzip-level", 6, "gzip compression level (1-9, 1=BestSpeed, 9=BestCompression)")
	gzipEnabled    = flag.Bool("gzip", false, "gzip output files.")
	skipEmptyFiles = flag.Bool("skip-empty-files", false, "Skip writing empty files")
	topicPollRate  = flag.Duration("topic-refresh", time.Minute, "how frequently the topic list should be refreshed")
	topicPattern   = flag.String("topic-pattern", ".*", "Only log topics matching the following pattern")

	rotateSize     = flag.Int64("rotate-size", 0, "rotate the file when it grows bigger than `rotate-size` bytes")
	rotateInterval = flag.Duration("rotate-interval", 0*time.Second, "rotate the file every duration")

	nsqdTCPAddrs     = app.StringArray{}
	lookupdHTTPAddrs = app.StringArray{}
	topics           = app.StringArray{}

	// TODO: remove, deprecated
	gzipCompression = flag.Int("gzip-compression", 3, "(deprecated) use --gzip-level, gzip compression level (1 = BestSpeed, 2 = BestCompression, 3 = DefaultCompression)")
)
consumer-opt

最近两个是 reader-optconsumer-opt ,其实功能相同,建议使用 consumer-opt。

这个命令行功能是为 go-nsq 设置 config 参数,因为 nsq_to_file 是在其基础上开发出来的。而且参数说明中也说了,有可能已经被设置好了的。

以下是 config 结构体的定义(文件: github.com/nsqio/go-nsq/config.go):

type Config struct {
	initialized bool

	// used to Initialize, Validate
	configHandlers []configHandler

	DialTimeout time.Duration `opt:"dial_timeout" default:"1s"`

	// Deadlines for network reads and writes
	ReadTimeout  time.Duration `opt:"read_timeout" min:"100ms" max:"5m" default:"60s"`
	WriteTimeout time.Duration `opt:"write_timeout" min:"100ms" max:"5m" default:"1s"`

	// LocalAddr is the local address to use when dialing an nsqd.
	// If empty, a local address is automatically chosen.
	LocalAddr net.Addr `opt:"local_addr"`

	// Duration between polling lookupd for new producers, and fractional jitter to add to
	// the lookupd pool loop. this helps evenly distribute requests even if multiple consumers
	// restart at the same time
	//
	// NOTE: when not using nsqlookupd, LookupdPollInterval represents the duration of time between
	// reconnection attempts
	LookupdPollInterval time.Duration `opt:"lookupd_poll_interval" min:"10ms" max:"5m" default:"60s"`
	LookupdPollJitter   float64       `opt:"lookupd_poll_jitter" min:"0" max:"1" default:"0.3"`

	// Maximum duration when REQueueing (for doubling of deferred requeue)
	MaxRequeueDelay     time.Duration `opt:"max_requeue_delay" min:"0" max:"60m" default:"15m"`
	DefaultRequeueDelay time.Duration `opt:"default_requeue_delay" min:"0" max:"60m" default:"90s"`

	// Backoff strategy, defaults to exponential backoff. Overwrite this to define alternative backoff algrithms.
	BackoffStrategy BackoffStrategy `opt:"backoff_strategy" default:"exponential"`
	// Maximum amount of time to backoff when processing fails 0 == no backoff
	MaxBackoffDuration time.Duration `opt:"max_backoff_duration" min:"0" max:"60m" default:"2m"`
	// Unit of time for calculating consumer backoff
	BackoffMultiplier time.Duration `opt:"backoff_multiplier" min:"0" max:"60m" default:"1s"`

	// Maximum number of times this consumer will attempt to process a message before giving up
	MaxAttempts uint16 `opt:"max_attempts" min:"0" max:"65535" default:"5"`

	// Duration to wait for a message from a producer when in a state where RDY
	// counts are re-distributed (ie. max_in_flight < num_producers)
	LowRdyIdleTimeout time.Duration `opt:"low_rdy_idle_timeout" min:"1s" max:"5m" default:"10s"`

	// Duration between redistributing max-in-flight to connections
	RDYRedistributeInterval time.Duration `opt:"rdy_redistribute_interval" min:"1ms" max:"5s" default:"5s"`

	// Identifiers sent to nsqd representing this client
	// UserAgent is in the spirit of HTTP (default: "<client_library_name>/<version>")
	ClientID  string `opt:"client_id"` // (defaults: short hostname)
	Hostname  string `opt:"hostname"`
	UserAgent string `opt:"user_agent"`

	// Duration of time between heartbeats. This must be less than ReadTimeout
	HeartbeatInterval time.Duration `opt:"heartbeat_interval" default:"30s"`
	// Integer percentage to sample the channel (requires nsqd 0.2.25+)
	SampleRate int32 `opt:"sample_rate" min:"0" max:"99"`

	// To set TLS config, use the following options:
	//
	// tls_v1 - Bool enable TLS negotiation
	// tls_root_ca_file - String path to file containing root CA
	// tls_insecure_skip_verify - Bool indicates whether this client should verify server certificates
	// tls_cert - String path to file containing public key for certificate
	// tls_key - String path to file containing private key for certificate
	// tls_min_version - String indicating the minimum version of tls acceptable ('ssl3.0', 'tls1.0', 'tls1.1', 'tls1.2')
	//
	TlsV1     bool        `opt:"tls_v1"`
	TlsConfig *tls.Config `opt:"tls_config"`

	// Compression Settings
	Deflate      bool `opt:"deflate"`
	DeflateLevel int  `opt:"deflate_level" min:"1" max:"9" default:"6"`
	Snappy       bool `opt:"snappy"`

	// Size of the buffer (in bytes) used by nsqd for buffering writes to this connection
	OutputBufferSize int64 `opt:"output_buffer_size" default:"16384"`
	// Timeout used by nsqd before flushing buffered writes (set to 0 to disable).
	//
	// WARNING: configuring clients with an extremely low
	// (< 25ms) output_buffer_timeout has a significant effect
	// on nsqd CPU usage (particularly with > 50 clients connected).
	OutputBufferTimeout time.Duration `opt:"output_buffer_timeout" default:"250ms"`

	// Maximum number of messages to allow in flight (concurrency knob)
	MaxInFlight int `opt:"max_in_flight" min:"0" default:"1"`

	// The server-side message timeout for messages delivered to this client
	MsgTimeout time.Duration `opt:"msg_timeout" min:"0"`

	// secret for nsqd authentication (requires nsqd 0.2.29+)
	AuthSecret string `opt:"auth_secret"`
}

从这里看出,使用 golang 默认标识的方式,已经设置好了参数的 key(即 opt 后的值)和 default 值了。暂且不看各功能的含义(太多了),我们先看如何设置。

原文解释:

The only valid way to create a Config is via NewConfig, using a struct literal will panic. After Config is passed into a high-level type (like Consumer, Producer, etc.) the values are no longer mutable (they are copied).

Use Set(option string, value interface{}) as an alternate way to set parameters.

Set 的代码:

// Set takes a comma separated value and follows the rules in Config.Set
// using the first field as the option key, and the second (if present) as the value
func (c *ConfigFlag) Set(opt string) (err error) {
	parts := strings.SplitN(opt, ",", 2)
	key := parts[0]

	switch len(parts) {
	case 1:
		// default options specified without a value to boolean true
		err = c.Config.Set(key, true)
	case 2:
		err = c.Config.Set(key, parts[1])
	}
	return
}

可以看出,对命令行的要求是“key,value”,即使用“,”分隔。

如设置 timeout 的启动命令:

./nsq_to_file --lookupd-http-address=127.0.0.1:4161 --consumer-opt="read_timeout,100s"

继续 main 函数代码阅读

channel
	if *showVersion {	//显示版本信息
		fmt.Printf("nsq_to_file v%s\n", version.Binary)
		return
	}

	if *channel == "" {	//要求必须设置 channel 名称
		log.Fatal("--channel is required")
	}

showVersion 没什么好说的,关键是 channel opt。

通过之前的准备工作,了解到 NSQ 消息分发是无单点的分布式拓扑结构,其中 Topic 和 channel 是其重要的组成单元,一个 Topic 下的信息会同时发送给不同的 channel 中,而同一个 channel 下的多个消费者中,只有一个可以获得此 channel 下的消息。

做一个测试:

再开启一个消费者终端,与之前不同的是,此终端监听 Channel 为"test-channel":

nsq_to_file --lookupd-http-address=127.0.0.1:4161 --channel="test-channel" --output-dir=tmp

向 NSQ-http 端口发送消息:

curl -d "hello world test5" "http://127.0.0.1:4151/put?topic=test"

可以看到输出文件中,有两行 hello world test5 ,这是因为 channel nsq_to_file (默认 channel)和 test-channel 中都会有此条消息,而虽然 channel nsq_to_file 中有三个消费者在等待接收,但只有一个消费者获取到,其余继续等待。 test-channel 中的消费者获取信息后,也会写到/tmp 目录下的文件中,因此两条消息均会被记录。

明白 channel 的作用后,在继续阅读 channel 的使用代码之前,将 main 后面几个同类型 opt 判定也看完:

	var topicsFromNSQLookupd bool	//判断消息来源是否为 NSQ-LOOKUPD

	if len(nsqdTCPAddrs) == 0 && len(lookupdHTTPAddrs) == 0 {
		log.Fatal("--nsqd-tcp-address or --lookupd-http-address required.")
	}
	if len(nsqdTCPAddrs) != 0 && len(lookupdHTTPAddrs) != 0 {
		log.Fatal("use --nsqd-tcp-address or --lookupd-http-address not both")
	}
	//要求必须从 TCP 端口与 HTTP 端口中必须且只能监听其中一个,不可同时监听,也不可都不监听

	if *gzipLevel < 1 || *gzipLevel > 9 {
		log.Fatalf("invalid --gzip-level value (%d), should be 1-9", *gzipLevel)
	}

	// TODO: remove, deprecated
	if hasArg("gzip-compression") { 
		log.Printf("WARNING: --gzip-compression is deprecated in favor of --gzip-level")
		switch *gzipCompression {
		case 1:
			*gzipLevel = gzip.BestSpeed
		case 2:
			*gzipLevel = gzip.BestCompression
		case 3:
			*gzipLevel = gzip.DefaultCompression
		default:
			log.Fatalf("invalid --gzip-compression value (%d), should be 1,2,3", *gzipCompression)
		}
	}	//根据之前设置的压缩等级进行压缩

	cfg.UserAgent = fmt.Sprintf("nsq_to_file/%s go-nsq/%s", version.Binary, nsq.VERSION)
	cfg.MaxInFlight = *maxInFlight	// 允许缓存的消息条数上限

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

迟月

暂无简介

0 文章
0 评论
22 人气
更多

推荐作者

雨幕

文章 0 评论 0

alipaysp_XIzGSod4b1

文章 0 评论 0

许强

文章 0 评论 0

别理我

文章 0 评论 0

败给现实

文章 0 评论 0

淡淡の花香

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文