如何使用memdb.WatchSet.WatchCtx(ctx)?
我目前正在开发一个网关服务,该服务基本上将 mqtt 主题出版物转换为不同的表示形式,然后在 gRPC 流上发送。为了将不同的主题及其值(json 序列化对象)保存在内存中,我们使用 hashcorps memdb。
选择 memdb 的原因之一是能够注册表更改的观察程序。简单来说,就是将发布的值写入相应的表中,然后通过注册的观察者触发反应。
代码看起来像这样:
func main() {
..
go handle(ctx)
}
func handle(ctx context.Context) {
if ws, al, err := Watch(); err != nil {
// log
} else {
for {
// blocking call according to documentation
if err = ws.WatchCtx(ctx); err != nil {
fmt.Println("received cancel, exit loop")
break
} else {
fmt.Println("received update")
// reinit watcher to prevent being retriggered by the same event
if ws, al, err = Watch(); err != nil {
// log
break
} else {
fmt.Println(al)
}
}
}
}
}
func Watch() (ws memdb.WatchSet, al AccessList, err error) {
txn := db.Txn(false)
if wc, v, e := txn.FirstWatch(accessTable, idIndex, id); e != nil {
err = e
} else if v == nil {
err = memdb.ErrNotFound
} else {
ws = memdb.NewWatchSet()
ws.Add(wc)
al = v.(AccessList)
}
return
}
我不明白的是,每次触发观察者时我们都必须创建一个新的观察者。如果观察者没有重新初始化,ws.WatchCtx() 就不再阻塞。 我的猜测是我们误解了这个概念,因此错误地使用了这些工具。 所以我的问题是如何使用 memdb 手表集。
I am currently working on a gateway service that basically converts mqtt topic publications into a different representation that is then sent on gRPC streams. To hold the different topics and their values (json serialized objects) in-memory, we use hashicorps memdb.
One reason for choosing memdb is the ability to register a watcher for table changes. To put it simply, the published value is written into the corresponding table, which then triggers a reaction via the registered watcher.
The code looks something like this:
func main() {
..
go handle(ctx)
}
func handle(ctx context.Context) {
if ws, al, err := Watch(); err != nil {
// log
} else {
for {
// blocking call according to documentation
if err = ws.WatchCtx(ctx); err != nil {
fmt.Println("received cancel, exit loop")
break
} else {
fmt.Println("received update")
// reinit watcher to prevent being retriggered by the same event
if ws, al, err = Watch(); err != nil {
// log
break
} else {
fmt.Println(al)
}
}
}
}
}
func Watch() (ws memdb.WatchSet, al AccessList, err error) {
txn := db.Txn(false)
if wc, v, e := txn.FirstWatch(accessTable, idIndex, id); e != nil {
err = e
} else if v == nil {
err = memdb.ErrNotFound
} else {
ws = memdb.NewWatchSet()
ws.Add(wc)
al = v.(AccessList)
}
return
}
What I don't understand is the fact that we have to create a new watcher every time the watcher was triggered. If the watcher is not reinitialized, ws.WatchCtx() is not blocking anymore.
My guess is that we have misunderstood the concept and are therefore using the tools incorrectly.
So my question is how to use memdb watchsets.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论