如何使用Akka图使用基于演员的源?
我正在尝试通过演员将数据发送到包含风扇的可运行图。
我将源定义为:
final Source<Integer, ActorRef> integerSource =
Source.actorRef(
elem -> {
if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
else return Optional.empty();
},
elem -> Optional.empty(),
10,
OverflowStrategy.dropHead());
但是我不确定如何在 actoref
上通过演员将数据发送到源,以便可运行的图将在接收到时会异步处理消息:
RunnableGraph<CompletionStage<Done>> graph = RunnableGraph.fromGraph(
GraphDSL.create(sink, (builder, out) -> {
SourceShape<Integer> sourceShape = builder.add(integerSource);
FlowShape<Integer, Integer> flow1Shape = builder.add(flow1);
FlowShape<Integer, Integer> flow2Shape = builder.add(flow1);
UniformFanOutShape<Integer, Integer> broadcast =
builder.add(Broadcast.create(2));
UniformFanInShape<Integer, Integer> merge =
builder.add(Merge.create(2));
builder.from(sourceShape)
.viaFanOut(broadcast)
.via(flow1Shape);
builder.from(broadcast).via(flow2Shape);
builder.from(flow1Shape)
.viaFanIn(merge)
.to(out);
builder.from(flow2Shape).viaFanIn(merge);
return ClosedShape.getInstance();
} )
);
整个SRC:
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.*;
import akka.stream.javadsl.*;
import lombok.extern.slf4j.Slf4j;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
@Slf4j
public class GraphActorSource {
private final static ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "flowActorSystem");
public void runFlow() {
final Source<Integer, ActorRef> integerSource =
Source.actorRef(
elem -> {
if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
else return Optional.empty();
},
elem -> Optional.empty(),
10,
OverflowStrategy.dropHead());
Flow<Integer, Integer, NotUsed> flow1 = Flow.of(Integer.class)
.map (x -> {
System.out.println("Flow 1 is processing " + x);
return (x * 2);
});
Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(x -> {
System.out.println(x);
});
RunnableGraph<CompletionStage<Done>> graph = RunnableGraph.fromGraph(
GraphDSL.create(sink, (builder, out) -> {
SourceShape<Integer> sourceShape = builder.add(integerSource);
FlowShape<Integer, Integer> flow1Shape = builder.add(flow1);
FlowShape<Integer, Integer> flow2Shape = builder.add(flow1);
UniformFanOutShape<Integer, Integer> broadcast =
builder.add(Broadcast.create(2));
UniformFanInShape<Integer, Integer> merge =
builder.add(Merge.create(2));
builder.from(sourceShape)
.viaFanOut(broadcast)
.via(flow1Shape);
builder.from(broadcast).via(flow2Shape);
builder.from(flow1Shape)
.viaFanIn(merge)
.to(out);
builder.from(flow2Shape).viaFanIn(merge);
return ClosedShape.getInstance();
} )
);
graph.run(actorSystem);
}
public static void main(String args[]){
new GraphActorSource().runFlow();
}
}
如何通过演员将数据发送到可运行的图表?
像? :
integerSource.tell(1)
integerSource.tell(2)
integerSource.tell(3)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
actorref.tell
工作。构造图形蓝图,以便源 actorref 将在构成蓝图并运行时返回。对于一个实体化的对象,请使用该实现的类型作为
Graph
的实体类型参数。在这里,
IntegerSource
的物料类型参数是actorRef
。Graph
的实现类型参数也是actorRef
。只有
integersource
传递给graphdsl.create
。要访问多个实体物体,必须构建元组捕获它们。如果需要从实体图中获得两个对象,例如SRC和SNK,则
Pair&lt; a,b&gt;
可以捕获两种类型。在这里,
integersource
和sink
都传递给graphdsl.create
。实现的
ActorRef
和完成阶段
是为运行
的结果配对的,pair :: new
。类型
pair&lt; actorRef,ploteionstage&lt; done&gt;&gt;
是graph
的实体类型参数。完整示例:(
build.gradle)
(src/main/java/gruphactorsource.java)
参考akka documentation(访问版本2.6.19)
streams/operators/operators/source.actorref
streams/streams/streams cookbook/与操作员一起工作
ActorRef.tell
works. Construct the graph blueprint so the sourceActorRef
will be returned when the blueprint is materialized and run.For just one materialized object, use that materialized type for the materialized type parameter of the
Graph
.Here the materialized type parameter for
integerSource
isActorRef
.The materialized type parameter for
Graph
is alsoActorRef
.Only
integerSource
is passed toGraphDSL.create
.To access more than one materialized object, a tuple must be constructed to capture them. If two objects from the materialized graph are desired, say src and snk, then
Pair<A,B>
can capture both types.Here both
integersource
andsink
are passed toGraphDSL.create
.The materialized
ActorRef
andCompletionStage
are paired for the result ofrun
withPair::new
.The type
Pair<ActorRef,CompletionStage<Done>>
is the materialized type parameter of theGraph
.Full example:
(build.gradle)
(src/main/java/GraphActorSource.java)
Reference Akka Documentation (accessed Version 2.6.19)
Streams / Operators / Source.actorRef
Streams / Streams Cookbook / Working with operators