USB/串口与Netty通信

发布于 2025-01-08 01:36:26 字数 603 浏览 2 评论 0原文

在我们的组织中,我们通过 UDP 和 TCP 实现了自己的协议,让连接到互联网的外部设备与我们使用 Netty 开发的服务器交换消息(确实如此!)。

出于测试目的,我们希望通过 USB/串行接口将这些设备直接连接到我们的计算机(我们尚未选择串行通信库)。我们还希望在计算机上部署/移植为设备开发的嵌入式软件,以模拟设备并使用命名管道(IPC)等直接连接到我们的服务器。

Netty 架构概述文档中,您声称我们可以使用Netty 也适用于此类串行通信:

此外,您甚至可以通过仅替换几行构造函数调用来利用尚未编写的新传输(例如串行端口通信传输)。此外,您可以编写通过扩展核心 API,您自己的传输。

是否有人已经在 Netty 中开发了此类实现,或者其他人是否计划进行此类实现?我还想知道 Netty 是否真的非常适合这种情况,因为 Channel 接口和许多其他接口使用 SocketAddress 来绑定/连接到对等点?

谢谢您的建议、忠告!

In our organisation, we implemented our own protocol over UDP and TCP to let external devices connected to the Internet exchange messages with a server that we developed using Netty (indeed!).

For testing purpose, we would like to connect those devices directly to our computers through USB/serial interface (we did not choose the serial communication library yet). We would also like to deploy/port the embedded software we developed for our devices on our computer to simulate the devices and to connect directly to our server using a named pipe for example (IPC).

In the Architecture Overview documentation of Netty, you claim that we could use Netty as well for such serial communication:

"Also, you are even able to take advantage of new transports which aren't yet written (such as serial port communication transport), again by replacing just a couple lines of constructor calls. Moreover, you can write your own transport by extending the core API."

Is anyone somewhere already developed such implementation in Netty or does someone else plan to do such implementation? I am also wondering if Netty is really well-suited for that since the Channel interface and many other ones use a SocketAddress to bind/connect to a peer?

Thank you for your suggestions, advices!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

肥爪爪 2025-01-15 01:36:26

我想知道您是否可以使用新的 iostream 包来实现这一点。这里你所需要的只是一个输入流和输出流。请参阅[1]

[1] https://github.com/netty/netty/tree/master/transport/src/main/java/io/netty/channel/iostream

I wonder if you may be able to use the new iostream package for that. All you need here is an InputStream and Outputstream. See [1]

[1] https://github.com/netty/netty/tree/master/transport/src/main/java/io/netty/channel/iostream

情绪 2025-01-15 01:36:26

实施这样的解决方案是可能的。我没有遇到与 SocketAddress 绑定的问题。


我正在发布我的 USB 连接与 Netty 的实现。

串行通信非常相似,为了简洁起见,我不会发布它。不过,如果有人需要的话,我也很乐意添加它。

这是连接的基类。 ChannelHandler根据通信需要实现。

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

import java.net.SocketAddress;

public abstract class ConnectorImpl {

    protected ChannelHandler handler;
    protected Bootstrap bootstrap;
    protected ChannelFuture channelFuture;

    public ChannelFuture connect() throws Exception {
        if (!isConnected()) {
            channelFuture = bootstrap.connect(getSocketAddress()).sync();
        }
        return channelFuture.channel().closeFuture();
    }

    public boolean isConnected() {
        try {
            return channelFuture.channel().isOpen();
        } catch (NullPointerException ex) {
            return false;
        }
    }

    public void close() {
        if (!isConnected()) {
            return;
        }
        try {
            channelFuture.channel().close().sync();
        } catch (InterruptedException e) {
        }
    }

    protected ChannelOutboundHandlerAdapter createOutgoingErrorHandler() {
        return new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                final ChannelFutureListener channelFutureListener = future -> {
                    if (!future.isSuccess()) {
                        future.channel().close();
                    }
                };
                promise.addListener(channelFutureListener);
                ctx.write(msg, promise);
            }
        };
    }

    public abstract SocketAddress getSocketAddress();
}

需要对该连接器进行扩展以实现所需的连接类型以及通道实现。

USB 连接器:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.timeout.ReadTimeoutHandler;
import javax.usb.UsbDevice;
import java.net.SocketAddress;

import java.util.concurrent.TimeUnit;

public class UsbConnectorImpl extends ConnectorImpl {

    private static final int READ_TIMEOUT = 60;
    private final UsbDevice usbDevice;

    public UsbConnectorImpl(UsbChannelHandler handler, UsbDevice usbDevice) {
        this.handler = handler;
        this.usbDevice = usbDevice;
        this.bootstrap = new Bootstrap()
                .channel(getChannelClass())
                .group(getLoop())
                .handler(getChannelInitializer());
    }

    public EventLoopGroup getLoop() {
        return new NioEventLoopGroup(1);
    }

    Class<UsbAsyncChannel> getChannelClass() {
        return UsbAsyncChannel.class;
    }

    ChannelInitializer<Channel> getChannelInitializer() {
        return new ChannelInitializer<Channel>() {

            @Override
            public void initChannel(@SuppressWarnings("NullableProblems") Channel ch) {
                ch.pipeline()
                        .addLast("Generic encoder", new RequestEncoder())
                        .addLast("Decoder", new ResponseDecoder())
                        .addLast("Read timeout handler", new ReadTimeoutHandler(READ_TIMEOUT, TimeUnit.SECONDS))
                        .addLast("Outgoing Error Handler", createOutgoingErrorHandler())
                        .addLast("Card Reader handler", handler);
            }
        };
    }

    public SocketAddress getSocketAddress() {
        return new UsbDeviceAddress(usbDevice);
    }
}

USB 通道:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.FileRegion;
import io.netty.channel.nio.AbstractNioByteChannel;
import org.usb4java.LibUsb;

import javax.usb.UsbConfiguration;
import javax.usb.UsbDevice;
import javax.usb.UsbEndpoint;
import javax.usb.UsbInterface;
import javax.usb.UsbPipe;
import javax.usb.event.UsbPipeDataEvent;
import javax.usb.event.UsbPipeErrorEvent;
import javax.usb.event.UsbPipeListener;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.Pipe;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public abstract class UsbChannel extends AbstractNioByteChannel {

    protected static final byte INTERFACE_BULK_PIPES = (byte) 1;
    private static final AtomicInteger READ_TASK_COUNTER = new AtomicInteger();
    private final UsbChannelConfig config;
    protected UsbPipe outPipe = null;
    protected UsbPipe inPipe = null;
    private UsbDevice usbDevice;
    private UsbDeviceAddress deviceAddress;
    private UsbInterface usbInterface;

    public UsbChannel() throws IOException {
        super(null, Pipe.open().source());
        config = new UsbChannelConfig(this);
    }

    @Override
    public UsbChannelConfig config() {
        return config;
    }

    @Override
    public boolean isActive() {
        return usbDevice != null;
    }

    @Override
    protected ChannelFuture shutdownInput() {
        try {
            doClose();
        } catch (Exception e) {
            pipeline().fireExceptionCaught(e);
        }
        return null;
    }

    protected abstract ReadTask createReadTask();

    protected void invokeRead() {
        ReadTask task = createReadTask();
        task.scheduledFuture = eventLoop().schedule(task, 0, TimeUnit.MILLISECONDS);
    }

    @Override
    protected AbstractNioUnsafe newUnsafe() {
        return new UsbUnsafe();
    }

    @Override
    protected long doWriteFileRegion(FileRegion region) throws Exception {
        throw new UnsupportedOperationException();
    }

    @Override
    protected int doReadBytes(ByteBuf buf) throws Exception {
        return 0;
    }

    @Override
    protected Pipe.SourceChannel javaChannel() {
        return (Pipe.SourceChannel) super.javaChannel();
    }

    @Override
    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        UsbDeviceAddress remote = (UsbDeviceAddress) remoteAddress;
        usbDevice = remote.value();

        UsbConfiguration configuration = usbDevice.getActiveUsbConfiguration();
        usbInterface = configuration.getUsbInterface(INTERFACE_BULK_PIPES);
        usbInterface = usbInterface.getActiveSetting();
        usbInterface.claim();

        for (int i = 0; i < usbInterface.getUsbEndpoints().size(); i++) {
            UsbEndpoint endpoint = (UsbEndpoint) usbInterface.getUsbEndpoints().get(i);
            UsbPipe usbPipe = endpoint.getUsbPipe();
            if (endpoint.getDirection() == LibUsb.ENDPOINT_IN) {
                inPipe = usbPipe;
                inPipe.open();
            } else if (endpoint.getDirection() == LibUsb.ENDPOINT_OUT) {
                outPipe = usbPipe;
                outPipe.open();
            }
            if (inPipe != null && outPipe != null) {
                break;
            }
        }

        outPipe.addUsbPipeListener(new UsbPipeListener() {
            @Override
            public void errorEventOccurred(UsbPipeErrorEvent event) {
                pipeline().fireExceptionCaught(event.getUsbException());
            }

            @Override
            public void dataEventOccurred(UsbPipeDataEvent event) {
                invokeRead();
            }
        });

        inPipe.addUsbPipeListener(new UsbPipeListener() {
            @Override
            public void errorEventOccurred(UsbPipeErrorEvent event) {
                pipeline().fireExceptionCaught(event.getUsbException());
            }

            @Override
            public void dataEventOccurred(UsbPipeDataEvent event) {
                pipeline().fireChannelRead(Unpooled.wrappedBuffer(event.getData(), 0, event.getData().length));
            }
        });

        deviceAddress = remote;

        return true;
    }

    @Override
    protected void doFinishConnect() throws Exception {
    }

    @Override
    public UsbDeviceAddress localAddress() {
        return (UsbDeviceAddress) super.localAddress();
    }

    @Override
    public UsbDeviceAddress remoteAddress() {
        return (UsbDeviceAddress) super.remoteAddress();
    }

    @Override
    protected UsbDeviceAddress localAddress0() {
        return deviceAddress;
    }

    @Override
    protected UsbDeviceAddress remoteAddress0() {
        return deviceAddress;
    }

    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        throw new UnsupportedOperationException();
    }

    @Override
    protected void doDisconnect() throws Exception {
        doClose();
    }

    @Override
    protected void doClose() throws Exception {
        try {
            super.doClose();
            javaChannel().close();
        } finally {
            if (inPipe != null) {
                inPipe.close();
                inPipe = null;
            }
            if (outPipe != null) {
                outPipe.close();
                outPipe = null;
            }
            if (usbInterface != null) {
                usbInterface.release();
                usbInterface = null;
            }
            if (usbDevice != null) {
                usbDevice = null;
            }
        }
    }

    protected abstract static class ReadTask implements Runnable, ChannelFutureListener {

        protected final int id;
        protected ScheduledFuture<?> scheduledFuture;

        public ReadTask() {
            this.id = READ_TASK_COUNTER.incrementAndGet();
        }
    }

    private final class UsbUnsafe extends AbstractNioUnsafe {
        @Override
        public void read() {
        }
    }
}

It is possible to implement such a solutions. I have not meet problems with binding with SocketAddress.


I’m posting my implementation of USB connection with Netty.

Serial communication is quite simillar, I'm not posting it for brevity. However I am happy to add it as well if anyone needs it.

Here is base class for connection. A ChannelHandler shall be implemented according to communication needs.

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

import java.net.SocketAddress;

public abstract class ConnectorImpl {

    protected ChannelHandler handler;
    protected Bootstrap bootstrap;
    protected ChannelFuture channelFuture;

    public ChannelFuture connect() throws Exception {
        if (!isConnected()) {
            channelFuture = bootstrap.connect(getSocketAddress()).sync();
        }
        return channelFuture.channel().closeFuture();
    }

    public boolean isConnected() {
        try {
            return channelFuture.channel().isOpen();
        } catch (NullPointerException ex) {
            return false;
        }
    }

    public void close() {
        if (!isConnected()) {
            return;
        }
        try {
            channelFuture.channel().close().sync();
        } catch (InterruptedException e) {
        }
    }

    protected ChannelOutboundHandlerAdapter createOutgoingErrorHandler() {
        return new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                final ChannelFutureListener channelFutureListener = future -> {
                    if (!future.isSuccess()) {
                        future.channel().close();
                    }
                };
                promise.addListener(channelFutureListener);
                ctx.write(msg, promise);
            }
        };
    }

    public abstract SocketAddress getSocketAddress();
}

An extensions of that connector for needed type of connection together with Channel implementations is needed.

USB connector:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.timeout.ReadTimeoutHandler;
import javax.usb.UsbDevice;
import java.net.SocketAddress;

import java.util.concurrent.TimeUnit;

public class UsbConnectorImpl extends ConnectorImpl {

    private static final int READ_TIMEOUT = 60;
    private final UsbDevice usbDevice;

    public UsbConnectorImpl(UsbChannelHandler handler, UsbDevice usbDevice) {
        this.handler = handler;
        this.usbDevice = usbDevice;
        this.bootstrap = new Bootstrap()
                .channel(getChannelClass())
                .group(getLoop())
                .handler(getChannelInitializer());
    }

    public EventLoopGroup getLoop() {
        return new NioEventLoopGroup(1);
    }

    Class<UsbAsyncChannel> getChannelClass() {
        return UsbAsyncChannel.class;
    }

    ChannelInitializer<Channel> getChannelInitializer() {
        return new ChannelInitializer<Channel>() {

            @Override
            public void initChannel(@SuppressWarnings("NullableProblems") Channel ch) {
                ch.pipeline()
                        .addLast("Generic encoder", new RequestEncoder())
                        .addLast("Decoder", new ResponseDecoder())
                        .addLast("Read timeout handler", new ReadTimeoutHandler(READ_TIMEOUT, TimeUnit.SECONDS))
                        .addLast("Outgoing Error Handler", createOutgoingErrorHandler())
                        .addLast("Card Reader handler", handler);
            }
        };
    }

    public SocketAddress getSocketAddress() {
        return new UsbDeviceAddress(usbDevice);
    }
}

USB Channel:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.FileRegion;
import io.netty.channel.nio.AbstractNioByteChannel;
import org.usb4java.LibUsb;

import javax.usb.UsbConfiguration;
import javax.usb.UsbDevice;
import javax.usb.UsbEndpoint;
import javax.usb.UsbInterface;
import javax.usb.UsbPipe;
import javax.usb.event.UsbPipeDataEvent;
import javax.usb.event.UsbPipeErrorEvent;
import javax.usb.event.UsbPipeListener;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.Pipe;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public abstract class UsbChannel extends AbstractNioByteChannel {

    protected static final byte INTERFACE_BULK_PIPES = (byte) 1;
    private static final AtomicInteger READ_TASK_COUNTER = new AtomicInteger();
    private final UsbChannelConfig config;
    protected UsbPipe outPipe = null;
    protected UsbPipe inPipe = null;
    private UsbDevice usbDevice;
    private UsbDeviceAddress deviceAddress;
    private UsbInterface usbInterface;

    public UsbChannel() throws IOException {
        super(null, Pipe.open().source());
        config = new UsbChannelConfig(this);
    }

    @Override
    public UsbChannelConfig config() {
        return config;
    }

    @Override
    public boolean isActive() {
        return usbDevice != null;
    }

    @Override
    protected ChannelFuture shutdownInput() {
        try {
            doClose();
        } catch (Exception e) {
            pipeline().fireExceptionCaught(e);
        }
        return null;
    }

    protected abstract ReadTask createReadTask();

    protected void invokeRead() {
        ReadTask task = createReadTask();
        task.scheduledFuture = eventLoop().schedule(task, 0, TimeUnit.MILLISECONDS);
    }

    @Override
    protected AbstractNioUnsafe newUnsafe() {
        return new UsbUnsafe();
    }

    @Override
    protected long doWriteFileRegion(FileRegion region) throws Exception {
        throw new UnsupportedOperationException();
    }

    @Override
    protected int doReadBytes(ByteBuf buf) throws Exception {
        return 0;
    }

    @Override
    protected Pipe.SourceChannel javaChannel() {
        return (Pipe.SourceChannel) super.javaChannel();
    }

    @Override
    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        UsbDeviceAddress remote = (UsbDeviceAddress) remoteAddress;
        usbDevice = remote.value();

        UsbConfiguration configuration = usbDevice.getActiveUsbConfiguration();
        usbInterface = configuration.getUsbInterface(INTERFACE_BULK_PIPES);
        usbInterface = usbInterface.getActiveSetting();
        usbInterface.claim();

        for (int i = 0; i < usbInterface.getUsbEndpoints().size(); i++) {
            UsbEndpoint endpoint = (UsbEndpoint) usbInterface.getUsbEndpoints().get(i);
            UsbPipe usbPipe = endpoint.getUsbPipe();
            if (endpoint.getDirection() == LibUsb.ENDPOINT_IN) {
                inPipe = usbPipe;
                inPipe.open();
            } else if (endpoint.getDirection() == LibUsb.ENDPOINT_OUT) {
                outPipe = usbPipe;
                outPipe.open();
            }
            if (inPipe != null && outPipe != null) {
                break;
            }
        }

        outPipe.addUsbPipeListener(new UsbPipeListener() {
            @Override
            public void errorEventOccurred(UsbPipeErrorEvent event) {
                pipeline().fireExceptionCaught(event.getUsbException());
            }

            @Override
            public void dataEventOccurred(UsbPipeDataEvent event) {
                invokeRead();
            }
        });

        inPipe.addUsbPipeListener(new UsbPipeListener() {
            @Override
            public void errorEventOccurred(UsbPipeErrorEvent event) {
                pipeline().fireExceptionCaught(event.getUsbException());
            }

            @Override
            public void dataEventOccurred(UsbPipeDataEvent event) {
                pipeline().fireChannelRead(Unpooled.wrappedBuffer(event.getData(), 0, event.getData().length));
            }
        });

        deviceAddress = remote;

        return true;
    }

    @Override
    protected void doFinishConnect() throws Exception {
    }

    @Override
    public UsbDeviceAddress localAddress() {
        return (UsbDeviceAddress) super.localAddress();
    }

    @Override
    public UsbDeviceAddress remoteAddress() {
        return (UsbDeviceAddress) super.remoteAddress();
    }

    @Override
    protected UsbDeviceAddress localAddress0() {
        return deviceAddress;
    }

    @Override
    protected UsbDeviceAddress remoteAddress0() {
        return deviceAddress;
    }

    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        throw new UnsupportedOperationException();
    }

    @Override
    protected void doDisconnect() throws Exception {
        doClose();
    }

    @Override
    protected void doClose() throws Exception {
        try {
            super.doClose();
            javaChannel().close();
        } finally {
            if (inPipe != null) {
                inPipe.close();
                inPipe = null;
            }
            if (outPipe != null) {
                outPipe.close();
                outPipe = null;
            }
            if (usbInterface != null) {
                usbInterface.release();
                usbInterface = null;
            }
            if (usbDevice != null) {
                usbDevice = null;
            }
        }
    }

    protected abstract static class ReadTask implements Runnable, ChannelFutureListener {

        protected final int id;
        protected ScheduledFuture<?> scheduledFuture;

        public ReadTask() {
            this.id = READ_TASK_COUNTER.incrementAndGet();
        }
    }

    private final class UsbUnsafe extends AbstractNioUnsafe {
        @Override
        public void read() {
        }
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文