如何:使用 Netty 和 Avro 进行异步回调

发布于 2025-01-01 17:21:06 字数 2790 浏览 3 评论 0原文

我正在尝试使用 NettyServer 实现来实现异步 Avro 调用。在挖掘源代码后,我从 TestNettyServerWithCallbacks.java 找到了一个有关如何使用 NettyServer 的示例。

当运行一些测试时,我意识到 NettyServer 从不调用 hello(Callback) 方法,而是不断调用同步 hello() 方法。客户端程序打印出“Hello”,但我期望结果是“Hello-ASYNC”。我真的不知道发生了什么事。

我希望有人能给我一些启发,也许可以指出我的错误。下面是我用来执行简单的异步 avro 测试的代码。

AvroClient.java - 客户端代码。

public class AvroClient {
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        try {
            NettyTransceiver transceiver = new NettyTransceiver(new InetSocketAddress(6666));
            Chat.Callback client = SpecificRequestor.getClient(Chat.Callback.class, transceiver);

            final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
            client.hello(future1);

            System.out.println(future1.get());

            transceiver.close();

        } catch (IOException ex) {
            System.err.println(ex);
        }
    }
}

AvroNetty.java - 服务器代码

public class AvroNetty {
    public static void main(String[] args) {
        Index indexImpl = new AsyncIndexImpl();
        Chat chatImpl = new ChatImpl();

        Server server = new NettyServer(new SpecificResponder(Chat.class, chatImpl), new InetSocketAddress(6666));
        server.start();
        System.out.println("Server is listening at port " + server.getPort());
    }
}

ChatImpl.java

public class ChatImpl implements Chat.Callback {
    @Override
    public void hello(org.apache.avro.ipc.Callback<CharSequence> callback) throws IOException {
        callback.handleResult("Hello-ASYNC");    
    }

    @Override
    public CharSequence hello() throws AvroRemoteException {
        return new Utf8("Hello");    
    }
}

该接口由 avro-tool 自动生成 Chat.java

@SuppressWarnings("all")
public interface Chat {
    public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"Chat\",\"namespace\":\"avro.test\",\"types\":[],\"messages\":{\"hello\":{\"request\":[],\"response\":\"string\"}}}");
    java.lang.CharSequence hello() throws org.apache.avro.AvroRemoteException;

    @SuppressWarnings("all")
    public interface Callback extends Chat {
        public static final org.apache.avro.Protocol PROTOCOL = avro.test.Chat.PROTOCOL;
        void hello(org.apache.avro.ipc.Callback<java.lang.CharSequence> callback) throws java.io.IOException;
    }
}

这是 Avro 架构

{
  "namespace": "avro.test",
  "protocol": "Chat",

  "types" : [],

  "messages": {
      "hello": { 
                    "request": [],
                    "response": "string"
      }
  }
}

I'm trying to implement Asynchronous Avro calls by using its NettyServer implementation. After digging the source code, I found an example on how to use NettyServer from TestNettyServerWithCallbacks.java

When running a few test, I realize that NettyServer never calls hello(Callback) method, instead it keeps calling the synchronous hello() method. The client program prints out "Hello" but I'm expecting "Hello-ASYNC" as a result. I really have no clue what's going on.

I hope someone can shine some light on me and perhaps point out the mistake. Below are the codes I use to perform a simple asynchronous avro test.

AvroClient.java - Client code.

public class AvroClient {
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        try {
            NettyTransceiver transceiver = new NettyTransceiver(new InetSocketAddress(6666));
            Chat.Callback client = SpecificRequestor.getClient(Chat.Callback.class, transceiver);

            final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
            client.hello(future1);

            System.out.println(future1.get());

            transceiver.close();

        } catch (IOException ex) {
            System.err.println(ex);
        }
    }
}

AvroNetty.java - The Server Code

public class AvroNetty {
    public static void main(String[] args) {
        Index indexImpl = new AsyncIndexImpl();
        Chat chatImpl = new ChatImpl();

        Server server = new NettyServer(new SpecificResponder(Chat.class, chatImpl), new InetSocketAddress(6666));
        server.start();
        System.out.println("Server is listening at port " + server.getPort());
    }
}

ChatImpl.java

public class ChatImpl implements Chat.Callback {
    @Override
    public void hello(org.apache.avro.ipc.Callback<CharSequence> callback) throws IOException {
        callback.handleResult("Hello-ASYNC");    
    }

    @Override
    public CharSequence hello() throws AvroRemoteException {
        return new Utf8("Hello");    
    }
}

This interface is auto-generated by avro-tool
Chat.java

@SuppressWarnings("all")
public interface Chat {
    public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"Chat\",\"namespace\":\"avro.test\",\"types\":[],\"messages\":{\"hello\":{\"request\":[],\"response\":\"string\"}}}");
    java.lang.CharSequence hello() throws org.apache.avro.AvroRemoteException;

    @SuppressWarnings("all")
    public interface Callback extends Chat {
        public static final org.apache.avro.Protocol PROTOCOL = avro.test.Chat.PROTOCOL;
        void hello(org.apache.avro.ipc.Callback<java.lang.CharSequence> callback) throws java.io.IOException;
    }
}

Here is the Avro Schema

{
  "namespace": "avro.test",
  "protocol": "Chat",

  "types" : [],

  "messages": {
      "hello": { 
                    "request": [],
                    "response": "string"
      }
  }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

疾风者 2025-01-08 17:21:06

NettyServer实现实际上根本没有实现Async风格。这是图书馆的一个不足。相反,您需要指定一个异步执行处理程序,而不是尝试通过回调将服务链接在一起。以下是我用来设置 NettyServer 以实现此目的的内容:

ExecutorService es = Executors.newCachedThreadPool();
OrderedMemoryAwareThreadPoolExecutor executor = new OrderedMemoryAwareThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), 0, 0);
ExecutionHandler executionHandler = new ExecutionHandler(executor);
final NettyServer server = new NettyServer(responder, addr, new NioServerSocketChannelFactory(es, es), executionHandler);

The NettyServer implementation actually doesn't implement the Async style at all. It is a deficiency in the library. Instead you need to specify an asynchronous execution handler rather than try and chain services together through callbacks. Here is what I use to setup my NettyServer to allow for this:

ExecutorService es = Executors.newCachedThreadPool();
OrderedMemoryAwareThreadPoolExecutor executor = new OrderedMemoryAwareThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), 0, 0);
ExecutionHandler executionHandler = new ExecutionHandler(executor);
final NettyServer server = new NettyServer(responder, addr, new NioServerSocketChannelFactory(es, es), executionHandler);
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文