当另一个 Sink 接收时发出的 Akka Source

发布于 2025-01-17 01:32:32 字数 250 浏览 7 评论 0原文

我有一个源a,它将值发送到接收器b

现在我想要另一个源 c ,每次 b 接收到事件时都会发出一个值。

我的想法是使用另一个可以用作通知程序的接收器d,但随后我需要从接收器创建源的功能。

a.to(b).alsoTo(d)

类似

Source.from(d)

I have a source a that emits values into a sink b.

Now I want to have another source c that emits a value, everytime b receives an event.

My idea was to use another sink d that can be used as a notifier, but then I need the functionality to create a Source from a Sink.

a.to(b).alsoTo(d)

something like

Source.from(d)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

公布 2025-01-24 01:32:32

另一种描述方式是,您希望 a 发出的每个事件都发送到 bc。这就是 BroadcastHub 的作用;它可用于允许多个Sinks 使用来自一个Source 的事件。

如果您将 Source 连接到 BroadcastHub.sink 然后具体化它,您将获得一个新的 Source。然后,这个 Source 可以附加到 2 个或多个 Sink,每个 Sink 将获得原始 发送的消息的副本来源

例如,我将其与 Akka 一起使用,让 Actor 向多个客户端广播消息(对于 gRPC 事件):

val (actorRef: ActorRef[Event], eventSource: Source[Event, akka.NotUsed]) =
  ActorSource.actorRef[Event](
    completionMatcher = PartialFunction.empty,
    failureMatcher = PartialFunction.empty,
    16,
    OverflowStrategy.fail
  )
    .toMat(BroadcastHub.sink)(Keep.both)
    .run()

这会创建可在管道中使用的 eventSource 并多次具体化以创建多个流。每次将消息发送到actorRef时,从eventSource具体化的每个流都会接收该消息。

请参阅文档了解更多细节。

Another way of describing this is that you want every event emitted by a to go to both b and c. This is what a BroadcastHub does; it can be used to allow events from one Source to be consumed by multiple Sinks.

If you connect a Source to a BroadcastHub.sink and then materialise it, you get a new Source. This Source can then be attached to 2 or more Sinks and each Sink will get a copy of the message sent by the original Source.

For example I use this with Akka to have a Actor that broadcasts messages to multiple clients (for gRPC events):

val (actorRef: ActorRef[Event], eventSource: Source[Event, akka.NotUsed]) =
  ActorSource.actorRef[Event](
    completionMatcher = PartialFunction.empty,
    failureMatcher = PartialFunction.empty,
    16,
    OverflowStrategy.fail
  )
    .toMat(BroadcastHub.sink)(Keep.both)
    .run()

This creates eventSource which can be used in a pipeline and materialised multiple times to create multiple streams. Each time a message is sent to the actorRef, every stream that was materialised from eventSource receives that message.

See the documentation for more details.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文