纳特斯流程一次交货一次

发布于 2025-02-11 19:09:31 字数 127 浏览 0 评论 0原文

我想通过NATS Jetstream实现一次一次交付系统。文档说Jetstream具有此选项,但是没有有关其工作方式以及客户如何实现此功能的示例或详细信息。我知道在出版商方面,我们可以在创建流时设置MSGID并指定重复窗口,但是消费者端呢?

I want to implement an exactly once delivery system with Nats Jetstream. Documentation says that Jetstream has this option, but there is no samples or details about that how it's work and how clients can implement this. I know that in publisher side we can set MsgId and specify duplication window when creating Stream, but what about consumer side?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

瞄了个咪的 2025-02-18 19:09:31

这是 noreferrer“>确切的企业交付)的文档。这有点不当,因为实际所需的内容(以及此功能提供的内容)是恰好是处理

正如您指出的那样,它是服务器收到已发布的消息时服务器的结合,以及通过已接收消息的订阅(如有必要的必要)进行订阅的A double ACK 调用。

这是一个示例(对于简洁而省略了多余的错误处理)。使用启用Jetstream启动服务器:nats-server -js,然后运行此代码(假设Nats.go v1.16+)。

package main

import (
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

func failOnErr(err error) {
    if err != nil {
        log.Fatal(err)
    }
}

func main() {
    // Connect and get the JetStream context.
    nc, _ := nats.Connect(nats.DefaultURL)
    js, _ := nc.JetStream()

    // Create a test stream.
    _, err := js.AddStream(&nats.StreamConfig{
        Name:       "test",
        Storage:    nats.MemoryStorage,
        Subjects:   []string{"test.>"},
        Duplicates: time.Minute,
    })
    failOnErr(err)

    defer js.DeleteStream("test")

    // Publish some messages with duplicates.
    js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
    js.Publish("test.2", []byte("world"), nats.MsgId("2"))
    js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
    js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
    js.Publish("test.2", []byte("world"), nats.MsgId("2"))
    js.Publish("test.2", []byte("world"), nats.MsgId("2"))

    // Create an explicit pull consumer on the stream.
    _, err = js.AddConsumer("test", &nats.ConsumerConfig{
        Durable:       "test",
        AckPolicy:     nats.AckExplicitPolicy,
        DeliverPolicy: nats.DeliverAllPolicy,
    })
    failOnErr(err)
    defer js.DeleteConsumer("test", "test")

    // Create a subscription on the pull consumer.
    // Subject can be empty since it defaults to all subjects bound to the stream.
    sub, err := js.PullSubscribe("", "test", nats.BindStream("test"))
    failOnErr(err)

    // Only two should be delivered.
    batch, _ := sub.Fetch(10)
    log.Printf("%d messages", len(batch))

    // AckSync both to ensure the server received the ack.
    batch[0].AckSync()
    batch[1].AckSync()

    // Should be zero.
    batch, _ = sub.Fetch(10, nats.MaxWait(time.Second))
    log.Printf("%d messages", len(batch))
}

值得注意的是,如果acksync确实失败(可以从中返回错误),则在此代码上再次重试ACK,直到收到响应。来自客户的冗余ACK是一个无障碍。

Here are the docs for exactly-once delivery. This is a bit of a misnomer since what is actually needed (and what this feature provides) is exactly-once processing.

As you point out, it is a combination of de-duplication by the server when receiving a published message as well as a double ack call by the subscription that had received the message (plus retries if necessary).

Here is an example (excess error handling elided for brevity). Start the server with JetStream enabled: nats-server --js and then run this code (it assuming nats.go v1.16+).

package main

import (
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

func failOnErr(err error) {
    if err != nil {
        log.Fatal(err)
    }
}

func main() {
    // Connect and get the JetStream context.
    nc, _ := nats.Connect(nats.DefaultURL)
    js, _ := nc.JetStream()

    // Create a test stream.
    _, err := js.AddStream(&nats.StreamConfig{
        Name:       "test",
        Storage:    nats.MemoryStorage,
        Subjects:   []string{"test.>"},
        Duplicates: time.Minute,
    })
    failOnErr(err)

    defer js.DeleteStream("test")

    // Publish some messages with duplicates.
    js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
    js.Publish("test.2", []byte("world"), nats.MsgId("2"))
    js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
    js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
    js.Publish("test.2", []byte("world"), nats.MsgId("2"))
    js.Publish("test.2", []byte("world"), nats.MsgId("2"))

    // Create an explicit pull consumer on the stream.
    _, err = js.AddConsumer("test", &nats.ConsumerConfig{
        Durable:       "test",
        AckPolicy:     nats.AckExplicitPolicy,
        DeliverPolicy: nats.DeliverAllPolicy,
    })
    failOnErr(err)
    defer js.DeleteConsumer("test", "test")

    // Create a subscription on the pull consumer.
    // Subject can be empty since it defaults to all subjects bound to the stream.
    sub, err := js.PullSubscribe("", "test", nats.BindStream("test"))
    failOnErr(err)

    // Only two should be delivered.
    batch, _ := sub.Fetch(10)
    log.Printf("%d messages", len(batch))

    // AckSync both to ensure the server received the ack.
    batch[0].AckSync()
    batch[1].AckSync()

    // Should be zero.
    batch, _ = sub.Fetch(10, nats.MaxWait(time.Second))
    log.Printf("%d messages", len(batch))
}

It is worth noting that if an AckSync does fail (an error can be returned from it) then its on this code to retry the ack again until a response is received. A redundant ack from the client is a no-op.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文