如何使用Akka图使用基于演员的源?

发布于 2025-02-06 20:55:32 字数 4345 浏览 1 评论 0 原文

我正在尝试通过演员将数据发送到包含风扇的可运行图。

我将源定义为:

final Source<Integer, ActorRef> integerSource =
        Source.actorRef(
                elem -> {
                    if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
                    else return Optional.empty();
                },
                elem -> Optional.empty(),
                10,
                OverflowStrategy.dropHead());

但是我不确定如何在 actoref 上通过演员将数据发送到源,以便可运行的图将在接收到时会异步处理消息:

RunnableGraph<CompletionStage<Done>> graph = RunnableGraph.fromGraph(
        GraphDSL.create(sink, (builder, out) -> {
            SourceShape<Integer> sourceShape = builder.add(integerSource);
            FlowShape<Integer, Integer> flow1Shape = builder.add(flow1);
            FlowShape<Integer, Integer> flow2Shape = builder.add(flow1);
            UniformFanOutShape<Integer, Integer> broadcast =
                    builder.add(Broadcast.create(2));
            UniformFanInShape<Integer, Integer> merge =
                    builder.add(Merge.create(2));

            builder.from(sourceShape)
                    .viaFanOut(broadcast)
                    .via(flow1Shape);

            builder.from(broadcast).via(flow2Shape);

            builder.from(flow1Shape)
                    .viaFanIn(merge)
                    .to(out);

            builder.from(flow2Shape).viaFanIn(merge);


            return ClosedShape.getInstance();
        } )
);

整个SRC:

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.*;
import akka.stream.javadsl.*;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;
import java.util.concurrent.CompletionStage;

@Slf4j
public class GraphActorSource {

    private final static ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "flowActorSystem");

    public void runFlow() {


        final Source<Integer, ActorRef> integerSource =
                Source.actorRef(
                        elem -> {
                            if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
                            else return Optional.empty();
                        },
                        elem -> Optional.empty(),
                        10,
                        OverflowStrategy.dropHead());


        Flow<Integer, Integer, NotUsed> flow1 = Flow.of(Integer.class)
                .map (x -> {
                    System.out.println("Flow 1 is processing " + x);
                    return (x * 2);
                });

        Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(x -> {
            System.out.println(x);
        });

        RunnableGraph<CompletionStage<Done>> graph = RunnableGraph.fromGraph(
                GraphDSL.create(sink, (builder, out) -> {
                    SourceShape<Integer> sourceShape = builder.add(integerSource);
                    FlowShape<Integer, Integer> flow1Shape = builder.add(flow1);
                    FlowShape<Integer, Integer> flow2Shape = builder.add(flow1);
                    UniformFanOutShape<Integer, Integer> broadcast =
                            builder.add(Broadcast.create(2));
                    UniformFanInShape<Integer, Integer> merge =
                            builder.add(Merge.create(2));

                    builder.from(sourceShape)
                            .viaFanOut(broadcast)
                            .via(flow1Shape);

                    builder.from(broadcast).via(flow2Shape);

                    builder.from(flow1Shape)
                            .viaFanIn(merge)
                            .to(out);

                    builder.from(flow2Shape).viaFanIn(merge);


                    return ClosedShape.getInstance();
                } )
        );

        graph.run(actorSystem);

    }

    public static void main(String args[]){
        new GraphActorSource().runFlow();
    }
}

如何通过演员将数据发送到可运行的图表?

像? :

integerSource.tell(1)
integerSource.tell(2)
integerSource.tell(3)

I'm attempting to send data via an actor to a runnable graph that contains a fan out.

I define the source as :

final Source<Integer, ActorRef> integerSource =
        Source.actorRef(
                elem -> {
                    if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
                    else return Optional.empty();
                },
                elem -> Optional.empty(),
                10,
                OverflowStrategy.dropHead());

But I'm unsure how to get a handle on an ActoRef to send data via an actor to the source so that the runnable graph will process messages asynchronously as they are received :

RunnableGraph<CompletionStage<Done>> graph = RunnableGraph.fromGraph(
        GraphDSL.create(sink, (builder, out) -> {
            SourceShape<Integer> sourceShape = builder.add(integerSource);
            FlowShape<Integer, Integer> flow1Shape = builder.add(flow1);
            FlowShape<Integer, Integer> flow2Shape = builder.add(flow1);
            UniformFanOutShape<Integer, Integer> broadcast =
                    builder.add(Broadcast.create(2));
            UniformFanInShape<Integer, Integer> merge =
                    builder.add(Merge.create(2));

            builder.from(sourceShape)
                    .viaFanOut(broadcast)
                    .via(flow1Shape);

            builder.from(broadcast).via(flow2Shape);

            builder.from(flow1Shape)
                    .viaFanIn(merge)
                    .to(out);

            builder.from(flow2Shape).viaFanIn(merge);


            return ClosedShape.getInstance();
        } )
);

Entire src :

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.stream.*;
import akka.stream.javadsl.*;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;
import java.util.concurrent.CompletionStage;

@Slf4j
public class GraphActorSource {

    private final static ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "flowActorSystem");

    public void runFlow() {


        final Source<Integer, ActorRef> integerSource =
                Source.actorRef(
                        elem -> {
                            if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
                            else return Optional.empty();
                        },
                        elem -> Optional.empty(),
                        10,
                        OverflowStrategy.dropHead());


        Flow<Integer, Integer, NotUsed> flow1 = Flow.of(Integer.class)
                .map (x -> {
                    System.out.println("Flow 1 is processing " + x);
                    return (x * 2);
                });

        Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(x -> {
            System.out.println(x);
        });

        RunnableGraph<CompletionStage<Done>> graph = RunnableGraph.fromGraph(
                GraphDSL.create(sink, (builder, out) -> {
                    SourceShape<Integer> sourceShape = builder.add(integerSource);
                    FlowShape<Integer, Integer> flow1Shape = builder.add(flow1);
                    FlowShape<Integer, Integer> flow2Shape = builder.add(flow1);
                    UniformFanOutShape<Integer, Integer> broadcast =
                            builder.add(Broadcast.create(2));
                    UniformFanInShape<Integer, Integer> merge =
                            builder.add(Merge.create(2));

                    builder.from(sourceShape)
                            .viaFanOut(broadcast)
                            .via(flow1Shape);

                    builder.from(broadcast).via(flow2Shape);

                    builder.from(flow1Shape)
                            .viaFanIn(merge)
                            .to(out);

                    builder.from(flow2Shape).viaFanIn(merge);


                    return ClosedShape.getInstance();
                } )
        );

        graph.run(actorSystem);

    }

    public static void main(String args[]){
        new GraphActorSource().runFlow();
    }
}

How to send data to the Runnable graph via an actor?

Something like ? :

integerSource.tell(1)
integerSource.tell(2)
integerSource.tell(3)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

月朦胧 2025-02-13 20:55:33

actorref.tell 工作。构造图形蓝图,以便源 actorref 将在构成蓝图并运行时返回。

对于一个实体化的对象,请使用该实现的类型作为 Graph 的实体类型参数。

在这里, IntegerSource 的物料类型参数是 actorRef
Graph 的实现类型参数也是 actorRef
只有 integersource 传递给 graphdsl.create

Source<Integer, ActorRef> integerSource = ...

Graph<ClosedShape, ActorRef> graph =
  GraphDSL.create(integerSource, (builder, src) -> {
     ...
  });

RunnableGraph<ActorRef> runnableGraph = RunnableGraph.fromGraph(graph);

ActorRef actorRef = runnableGraph.run(actorSystem);

actorRef.tell(1, ActorRef.noSender());

要访问多个实体物体,必须构建元组捕获它们。如果需要从实体图中获得两个对象,例如SRC和SNK,则 Pair&lt; a,b&gt; 可以捕获两种类型。

在这里, integersource sink 都传递给 graphdsl.create
实现的 ActorRef 完成阶段是为运行的结果配对的, pair :: new
类型 pair&lt; actorRef,ploteionstage&lt; done&gt;&gt; graph 的实体类型参数。

Source<Integer, ActorRef> integerSource = ...
Sink<Integer, CompletionStage<Done>> sink = ...

Graph<ClosedShape, Pair<ActorRef, CompletionStage<Done>>> graph =
      GraphDSL.create(integerSource, sink, Pair::new, (builder, src, snk) -> {
....
});

RunnableGraph<Pair<ActorRef, CompletionStage<Done>>> runnableGraph =
  RunnableGraph.fromGraph(graph);

Pair<ActorRef, CompletionStage<Done>> pair =
  runnableGraph.run(actorSystem);
ActorRef actorRef = pair.first();
CompletionStage<Done> completionStage = pair.second();

actorRef.tell(1, ActorRef.noSender());

完整示例:(

build.gradle)

apply plugin: "java"
apply plugin: "application"
mainClassName = "GraphActorSource"
repositories {
  mavenCentral()
}
dependencies { 
  implementation "com.typesafe.akka:akka-actor-typed_2.13:2.6.19"
  implementation "com.typesafe.akka:akka-stream-typed_2.13:2.6.19"
  implementation 'org.slf4j:slf4j-jdk14:1.7.36'
}
compileJava {
  options.compilerArgs << "-Xlint:unchecked"
}

(src/main/java/gruphactorsource.java)

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.Status.Success;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
import akka.stream.*;
import akka.stream.javadsl.*;
import akka.util.Timeout;

import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

public class GraphActorSource {

  private final static ActorSystem actorSystem =
    ActorSystem.create(Behaviors.empty(), "flowActorSystem");

  public void runFlow() {

    // 1. Create graph (blueprint)

    // 1a. Define source, flows, and sink

    final Source<Integer, ActorRef> integerSource =
      Source.actorRef
      (
        elem -> {
          if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
          else return Optional.empty();
        },
        elem -> Optional.empty(),
        10,
        OverflowStrategy.dropHead()
      );


    Flow<Integer, Integer, NotUsed> flow1 = Flow.of(Integer.class)
      .map (x -> {
          System.out.println("Flow 1 is processing " + x);
          return (100 + x);
        });
    Flow<Integer, Integer, NotUsed> flow2 = Flow.of(Integer.class)
      .map (x -> {
          System.out.println("Flow 2 is processing " + x);
          return (200 + x);
        });

    Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(x -> {
        System.out.println("Sink received "+x);
      });

    // 1b. Connect nodes and flows into a graph.
    // Inputs and output nodes (source, sink) will be produced at run start.

    Graph<ClosedShape, Pair<ActorRef, CompletionStage<Done>>> graph =
      GraphDSL.create(integerSource, sink, Pair::new, (builder, src, snk) -> {
          UniformFanOutShape<Integer, Integer> broadcast =
          builder.add(Broadcast.create(2));
          FlowShape<Integer, Integer> flow1Shape = builder.add(flow1);
          FlowShape<Integer, Integer> flow2Shape = builder.add(flow2);
          UniformFanInShape<Integer, Integer> merge =
          builder.add(Merge.create(2));

          builder.from(src)
          .viaFanOut(broadcast);

          builder.from(broadcast.out(0))
          .via(flow1Shape)
          .toInlet(merge.in(0));

          builder.from(broadcast.out(1))
          .via(flow2Shape)
          .toInlet(merge.in(1));

          builder.from(merge)
          .to(snk);

          return ClosedShape.getInstance();
        } );

    RunnableGraph<Pair<ActorRef, CompletionStage<Done>>> runnableGraph =
      RunnableGraph.fromGraph(graph);

    // 2. Start run,
    // which produces materialized source ActorRef and sink CompletionStage.

    Pair<ActorRef, CompletionStage<Done>> pair =
      runnableGraph.run(actorSystem);
    ActorRef actorRef = pair.first();
    CompletionStage<Done> completionStage = pair.second();

    // On completion, terminates actor system (optional).
    completionStage.thenRun(() -> {
        System.out.println("Done, terminating.");
        actorSystem.terminate();
      });
          
    // 3. Send messages to source actor

    actorRef.tell(1, ActorRef.noSender());
    actorRef.tell(2, ActorRef.noSender());

    // The stream completes successfully with the following message
    actorRef.tell(Done.done(), ActorRef.noSender());

  }

  public static void main(String args[]){
    new GraphActorSource().runFlow();
  }
}

参考akka documentation(访问版本2.6.19)

streams/operators/operators/source.actorref

streams/streams/streams cookbook/与操作员一起工作

ActorRef.tell works. Construct the graph blueprint so the source ActorRef will be returned when the blueprint is materialized and run.

For just one materialized object, use that materialized type for the materialized type parameter of the Graph.

Here the materialized type parameter for integerSource is ActorRef.
The materialized type parameter for Graph is also ActorRef.
Only integerSource is passed to GraphDSL.create.

Source<Integer, ActorRef> integerSource = ...

Graph<ClosedShape, ActorRef> graph =
  GraphDSL.create(integerSource, (builder, src) -> {
     ...
  });

RunnableGraph<ActorRef> runnableGraph = RunnableGraph.fromGraph(graph);

ActorRef actorRef = runnableGraph.run(actorSystem);

actorRef.tell(1, ActorRef.noSender());

To access more than one materialized object, a tuple must be constructed to capture them. If two objects from the materialized graph are desired, say src and snk, then Pair<A,B> can capture both types.

Here both integersource and sink are passed to GraphDSL.create.
The materialized ActorRef and CompletionStage are paired for the result of run with Pair::new.
The type Pair<ActorRef,CompletionStage<Done>> is the materialized type parameter of the Graph.

Source<Integer, ActorRef> integerSource = ...
Sink<Integer, CompletionStage<Done>> sink = ...

Graph<ClosedShape, Pair<ActorRef, CompletionStage<Done>>> graph =
      GraphDSL.create(integerSource, sink, Pair::new, (builder, src, snk) -> {
....
});

RunnableGraph<Pair<ActorRef, CompletionStage<Done>>> runnableGraph =
  RunnableGraph.fromGraph(graph);

Pair<ActorRef, CompletionStage<Done>> pair =
  runnableGraph.run(actorSystem);
ActorRef actorRef = pair.first();
CompletionStage<Done> completionStage = pair.second();

actorRef.tell(1, ActorRef.noSender());

Full example:

(build.gradle)

apply plugin: "java"
apply plugin: "application"
mainClassName = "GraphActorSource"
repositories {
  mavenCentral()
}
dependencies { 
  implementation "com.typesafe.akka:akka-actor-typed_2.13:2.6.19"
  implementation "com.typesafe.akka:akka-stream-typed_2.13:2.6.19"
  implementation 'org.slf4j:slf4j-jdk14:1.7.36'
}
compileJava {
  options.compilerArgs << "-Xlint:unchecked"
}

(src/main/java/GraphActorSource.java)

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.Status.Success;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
import akka.stream.*;
import akka.stream.javadsl.*;
import akka.util.Timeout;

import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

public class GraphActorSource {

  private final static ActorSystem actorSystem =
    ActorSystem.create(Behaviors.empty(), "flowActorSystem");

  public void runFlow() {

    // 1. Create graph (blueprint)

    // 1a. Define source, flows, and sink

    final Source<Integer, ActorRef> integerSource =
      Source.actorRef
      (
        elem -> {
          if (elem == Done.done()) return Optional.of(CompletionStrategy.immediately());
          else return Optional.empty();
        },
        elem -> Optional.empty(),
        10,
        OverflowStrategy.dropHead()
      );


    Flow<Integer, Integer, NotUsed> flow1 = Flow.of(Integer.class)
      .map (x -> {
          System.out.println("Flow 1 is processing " + x);
          return (100 + x);
        });
    Flow<Integer, Integer, NotUsed> flow2 = Flow.of(Integer.class)
      .map (x -> {
          System.out.println("Flow 2 is processing " + x);
          return (200 + x);
        });

    Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(x -> {
        System.out.println("Sink received "+x);
      });

    // 1b. Connect nodes and flows into a graph.
    // Inputs and output nodes (source, sink) will be produced at run start.

    Graph<ClosedShape, Pair<ActorRef, CompletionStage<Done>>> graph =
      GraphDSL.create(integerSource, sink, Pair::new, (builder, src, snk) -> {
          UniformFanOutShape<Integer, Integer> broadcast =
          builder.add(Broadcast.create(2));
          FlowShape<Integer, Integer> flow1Shape = builder.add(flow1);
          FlowShape<Integer, Integer> flow2Shape = builder.add(flow2);
          UniformFanInShape<Integer, Integer> merge =
          builder.add(Merge.create(2));

          builder.from(src)
          .viaFanOut(broadcast);

          builder.from(broadcast.out(0))
          .via(flow1Shape)
          .toInlet(merge.in(0));

          builder.from(broadcast.out(1))
          .via(flow2Shape)
          .toInlet(merge.in(1));

          builder.from(merge)
          .to(snk);

          return ClosedShape.getInstance();
        } );

    RunnableGraph<Pair<ActorRef, CompletionStage<Done>>> runnableGraph =
      RunnableGraph.fromGraph(graph);

    // 2. Start run,
    // which produces materialized source ActorRef and sink CompletionStage.

    Pair<ActorRef, CompletionStage<Done>> pair =
      runnableGraph.run(actorSystem);
    ActorRef actorRef = pair.first();
    CompletionStage<Done> completionStage = pair.second();

    // On completion, terminates actor system (optional).
    completionStage.thenRun(() -> {
        System.out.println("Done, terminating.");
        actorSystem.terminate();
      });
          
    // 3. Send messages to source actor

    actorRef.tell(1, ActorRef.noSender());
    actorRef.tell(2, ActorRef.noSender());

    // The stream completes successfully with the following message
    actorRef.tell(Done.done(), ActorRef.noSender());

  }

  public static void main(String args[]){
    new GraphActorSource().runFlow();
  }
}

Reference Akka Documentation (accessed Version 2.6.19)

Streams / Operators / Source.actorRef

Streams / Streams Cookbook / Working with operators

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文