使用 Consul 的 watch 机制监控服务的变化
这篇文章讲述了 Consul 的 watch 用法,以及使用 Golang 来监控 Consul 中 service 的变化。
前言
consul 常常被用来作服务注册与服务发现,而它的 watch 机制则可被用来监控一些数据的更新,包括:nodes, KV pairs, health checks 等。另外,在监控到数据变化后,还可以调用外部处理程序,此处理程序可以是任何可执行文件或 HTTP 调用,具体说明可见 官网 。
watch 探究
根据官方描述,watch 的实现依赖于 consul 的 HTTP API 的 blocking queries 。引用官方的文档:
Many endpoints in Consul support a feature known as "blocking queries". A blocking query is used to wait for a potential change using long polling. Not all endpoints support blocking, but each endpoint uniquely documents its support for blocking queries in the documentation.
Endpoints that support blocking queries return an HTTP header named
X-Consul-Index
. This is a unique identifier representing the current state of the requested resource.
On subsequent requests for this resource, the client can set the
index
query string parameter to the value ofX-Consul-Index
, indicating that the client wishes to wait for any changes subsequent to that index.
也就是说,blocking queries 属于长轮询的一种,如果 consul 的 http api (官方称作 endpoint) 支持 blocking queries,那么之后对该 api 进行请求的时候添加 index
参数,那么该请求将会被阻塞,直到其请求的数据有变化时才会响应结果,换句话说,就是对该 endpoint 启动了长轮询。
长轮训减少了频繁轮训的所造成的不必要的带宽和服务器资源开销,用在服务发现上,即时性也能有所保证。
说了这么多,让我们来尝试一下:
以生产环境启动 consul 的一个 agent:
$ ./consul agent -dev
在 consul 中添加一个键值对:
./consul kv put loglevel ERROR
启动一个访问 consul kv 数据中心的 http api:
curl -v http://localhost:8500/v1/kv/loglevel
在我这里,该 api 请求返回的响应头中的 X-Consul-Index
的值为 9981
对上一步的 api 添加 index 请求参数继续请求:
curl -v http://localhost:8500/v1/kv/loglevel?index=9981
此时该请求将会被阻塞。
在另一个 terminal 命令行中,更新第二步添加的 key 的值:
./consul kv put loglevel INFO
此时,第四步被阻塞的请求不再阻塞,并立马返回响应。
上述步骤演示了 watch 利用 blocking queries 对 KV 进行监控,而正如文章开始说的,watch 还支持对其他数据类型进行监控,其监控的数据类型有以下这些:
key
- Watch a specific KV pairkeyprefix
- Watch a prefix in the KV storeservices
- Watch the list of available servicesnodes
- Watch the list of nodesservice
- Watch the instances of a servicechecks
- Watch the value of health checksevent
- Watch for custom user events
其中对各个监控类型的使用方法,见 官方文档
Golang 实现 watch 对服务变化的监控
consul 官方提供了 Golang 版的 watch 包 。其实际上也是对 watch 机制进行了一层封装,最终代码实现的还是对 consul HTTP API 的 endpoints
的使用。
文章开始说过,“在监控到数据变化后,还可以调用外部处理程序”。是了,数据变化后调用外部处理程序才是有意义的,Golang 的 watch 包中对应的外部处理程序是一个函数 handler。因为业务的关系,这里只实现了 watch 对 service 的变化的监控,其主要创建了一个 plan 来对整个服务的变化做一个监控,以及再为每个服务创建一个 plan,对单个服务变化作监控。话不多说,上代码:
package main
import (
"fmt"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
"sync"
)
type WatchHandler interface {
Handler(uint64, interface{})
}
// ConsulWatch used to store all plans
type ConsulWatch struct {
watchers map[string]*watch.Plan // store plans
RWMutex *sync.RWMutex
}
// Handler used to watch whole consul services changes
func (c ConsulWatch) Handler(_ uint64, data interface{}) {
switch d := data.(type) {
// "services" watch type returns map[string][]string type. follow: https://www.consul.io/docs/dynamic-app-config/watches#services
case map[string][]string:
fmt.Println("d: ", d)
for k := range d {
if _, ok := c.watchers[k]; ok || k == "consul" {
continue
}
// start creating one watch plan to watch every service
c.InsertServiceWatch(k)
}
// read watchers and delete deregister services
c.RWMutex.RLock()
defer c.RWMutex.RUnlock()
watchers := c.watchers
fmt.Println("watchers: ", watchers)
for k, plan := range watchers {
if _, ok := d[k]; !ok {
plan.Stop()
delete(watchers, k)
}
}
default:
fmt.Printf("can't decide the watch type: %v\n", &d)
}
}
// NewWatchPlan new watch plan
func NewWatchPlan(watchType string, opts map[string]interface{}, handler WatchHandler) (*watch.Plan, error) {
var options = map[string]interface{}{
"type": watchType,
}
// combine params
for k, v := range opts {
options[k] = v
}
pl, err := watch.Parse(options)
if err != nil {
return nil, err
}
pl.Handler = handler.Handler
return pl, nil
}
func RunWatchPlan(plan *watch.Plan, address string) error {
defer plan.Stop()
err := plan.Run(address)
if err != nil {
fmt.Println("run consul error: ", err)
return err
}
return nil
}
func StartConsulWatch() {
// please replace 10.xx.xx.xx with the ture ip address.
address := fmt.Sprintf("%s:%d", "10.xx.xx.xx", 8500)
cw := ConsulWatch{
watchers: make(map[string]*watch.Plan),
RWMutex: new(sync.RWMutex),
}
wp, err := NewWatchPlan("services", nil, cw)
if err != nil {
fmt.Printf("new watch plan failed: %v\n", err)
}
err = RunWatchPlan(wp, address)
if err != nil {
return
}
}
// ServiceWatch is single service
type ServiceWatch struct {
Address string
}
func (s ServiceWatch) Handler(_ uint64, data interface{}) {
switch d := data.(type) {
case []*consulapi.ServiceEntry:
for _, entry := range d {
fmt.Println(fmt.Sprintf("service ip %s ", entry.Service.Address))
fmt.Println("service status: ", entry.Checks.AggregatedStatus())
}
}
}
func (c ConsulWatch) InsertServiceWatch(serviceName string) {
serviceOpts := map[string]interface{}{
"service": serviceName,
}
sw := ServiceWatch{
// please replace 10.xx.xx.xx with the ture ip address.
Address: fmt.Sprintf("%s:%d", "10.xx.xx.xx", 8500),
}
servicePlan, err := NewWatchPlan("service", serviceOpts, sw)
if err != nil {
fmt.Printf("new service watch failed: %v", err)
}
go func() {
_ = RunWatchPlan(servicePlan, sw.Address)
}()
defer c.RWMutex.Unlock()
c.RWMutex.Lock()
c.watchers[serviceName] = servicePlan
}
func main() {
StartConsulWatch()
}
当启动 consul Agent 后,运行上述代码,然后注册服务(服务注册的代码可利用搜索引擎),观察代码运行的结果。
参考资料
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
上一篇: Java 设计模式 - 单例模式
下一篇: 彻底找到 Tomcat 启动速度慢的元凶
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论