Java RMI + SSL +压缩=不可能!

发布于 2024-08-23 07:09:59 字数 799 浏览 10 评论 0原文

我已经设置了 RMI + SSL。这很好用。但似乎不可能在 RMI 和 SSL 之间插入压缩。这样 RMI 请求在通过 SSL 发送之前会被压缩。

我在网上看到一些帖子建议使用 SSLSocketFactory.createSocket() ,它使用 Socket 将 SSL 包装在压缩套接字上。但这似乎会尝试压缩 SSL 协议本身,这可能不太可压缩。

我想我应该创建一个 Socket 代理(Socket 的子类,它遵循另一个 Socket,就像 FilterOutputStream 那样) 。让代理通过压缩来包装输入/输出流。并让我的 SocketFactory 和 ServerSocketFactory 返回代理,包装 SSLSocket。

但接下来我们遇到了缓冲问题。压缩会缓冲数据,直到数据足够值得压缩,或者被告知刷新。当您没有通过套接字来回通信时,这很好。但通过 RMI 中的缓存套接字,您就可以实现这一点。由于无法识别 RMI 请求的结束,因此您无法刷新压缩数据。

Sun 有一个 RMISocketFactory 示例执行类似的操作,但他们根本没有解决这个问题。

备注:
1. SSL 支持压缩,但我在 JSSE 中找不到任何有关启用该功能的信息
2. 我知道对大量不相关的小块(RMI 通常由其组成)进行压缩并不是很有用。
3. 我知道如果我发送大量请求,RMI 不是最佳选择。
4. Java 6 中有一个 SSLRMISocketFactory,但它没有在我的自定义实现上添加任何内容。

I've setup RMI + SSL. This works great. But it doesn't seem possible to slip compression in between RMI and SSL. So that the RMI requests are compressed before they're sent over SSL.

I've seen some posts online suggest using SSLSocketFactory.createSocket() which takes a Socket to wrap SSL over a compressing socket. But that seems like it would try to compress the SSL protocol itself, which probably isn't very compressable.

I supposed I should create a Socket proxy (subclass of Socket that defers to another Socket, like FilterOutputStream does). Have the proxy wrap the Input/Ouput streams with compression. And have my SocketFactory and ServerSocketFactory return the proxies, wrapping the SSLSocket.

But then we have the buffering issue. Compression buffers the data until it gets enough worth compressing, or is told to flush. This is fine when you don't have back-and-forth communication over the socket. But with cached sockets in RMI, you have that. With no way to identify the end of an RMI request so you can flush your compressed data.

Sun has an RMISocketFactory example doing something like this but they don't address this at all.

notes:
1. SSL supports compression but I can't find anything about enabling that in JSSE
2. I know that compression on lots of small unrelated blocks (as RMI is usually composed of) isn't very beneficial.
3. I know that if I'm sending large requests, RMI isn't the best choice.
4. There is an SSLRMISocketFactory in Java 6. but it doesn't add anything over my custom implementation.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

绿萝 2024-08-30 07:09:59

我们这里有几个问题:

  • 我们不能简单地将 SocketFactories 相互包裹起来,就像我们对 InputStreams 和 OutputStreams 所做的那样。
  • Java 的基于 zlib 的 DeflatorOutputStream 不实现刷新。

我想我找到了一种机制,这似乎是如何运作的。

这将是一个部分系列,因为它需要一些时间来编写。
(您可以找到已完成内容的源代码 在我的 github 存储库中)。

自定义 SocketImpl

Socket 始终基于实现 SocketImpl 的对象。因此,拥有自定义套接字实际上意味着使用自定义 SocketImpl 类。这是一个基于一对流(以及一个基本套接字,用于关闭目的)的实现:

/**
 * A SocketImpl implementation which works on a pair
 * of streams.
 *
 * A instance of this class represents an already
 * connected socket, thus all the methods relating to
 * connecting, accepting and such are not implemented.
 *
 * The implemented methods are {@link #getInputStream},
 * {@link #getOutputStream}, {@link #available} and the
 * shutdown methods {@link #close}, {@link #shutdownInput},
 * {@link #shutdownOutput}.
 */
private static class WrappingSocketImpl extends SocketImpl {
    private InputStream inStream;
    private OutputStream outStream;

    private Socket base;
    
    WrappingSocketImpl(StreamPair pair, Socket base) {
        this.inStream = pair.input;
        this.outStream = pair.output;
        this.base = base;
    }

StreamPair 是一个简单的数据持有者类,请参见下文。

这些是重要的方法:

    protected InputStream getInputStream() {
        return inStream;
    }

    protected OutputStream getOutputStream() {
        return outStream;
    }

    protected int available() throws IOException {
        return inStream.available();
    }

然后是一些允许关闭的方法。这些没有经过真正的测试(也许我们也应该关闭或至少刷新流?),但它似乎适合我们的 RMI 使用。

    protected void close() throws IOException {
        base.close();
    }

    protected void shutdownInput() throws IOException {
        base.shutdownInput();
        // TODO: inStream.close() ?
    }

    protected void shutdownOutput() throws IOException {
        base.shutdownOutput();
        // TODO: outStream.close()?
    }

接下来的一些方法将由 Socket 构造函数调用(或由 RMI 引擎中的某些内容间接调用),但实际上不需要执行任何操作。

    protected void create(boolean stream) {
        if(!stream) {
            throw new IllegalArgumentException("datagram socket not supported.");
        }
    }

    public Object getOption(int optID) {
        System.err.println("getOption(" + optID + ")");
        return null;
    }

    public void setOption(int optID, Object value) {
        // noop, as we don't have any options.
    }

所有剩余的方法都不是必需的,我们通过抛出异常来实现它们(因此我们会注意到这个假设是否错误)。

    // unsupported operations

    protected void connect(String host, int port) {
        System.err.println("connect(" + host + ", " + port + ")");
        throw new UnsupportedOperationException();
    }


    protected void connect(InetAddress address, int port) {
        System.err.println("connect(" + address + ", " + port + ")");
        throw new UnsupportedOperationException();
    }

    protected void connect(SocketAddress addr, int timeout) {
        System.err.println("connect(" + addr + ", " + timeout + ")");
        throw new UnsupportedOperationException();
    }

    protected void bind(InetAddress host, int port) {
        System.err.println("bind(" + host + ", " + port + ")");
        throw new UnsupportedOperationException();
    }

    protected void listen(int backlog) {
        System.err.println("listen(" + backlog + ")");
        throw new UnsupportedOperationException();
    }

    protected void accept(SocketImpl otherSide) {
        System.err.println("accept(" + otherSide + ")");
        throw new UnsupportedOperationException();
    }

    protected void sendUrgentData(int data) {
        System.err.println("sendUrgentData()");
        throw new UnsupportedOperationException();
    }
}

这是构造函数使用的 StreamPair:

/**
 * A simple holder class for a pair of streams.
 */
public static class StreamPair {
    public InputStream input;
    public OutputStream output;
    public StreamPair(InputStream in, OutputStream out) {
        this.input = in; this.output = out;
    }
}

下一部分:使用它来实现 Socket 工厂。


一个套接字工厂,正在包装另一个套接字工厂。

我们在这里处理 RMI 套接字工厂(即 RMIClientSocketFactory, RMIServerSocketFactory, RMISocketFactory in java.rmi.server),但同样的想法也适用于使用套接字工厂接口的其他库。示例为 javax.net.SocketFactory< /a> (和 ServerSocketFactory), Apache Axis' SocketFactory ,JSch 的 SocketFactory

通常,这些工厂的想法是,它们以某种方式连接到原始服务器之外的另一台服务器(代理),然后进行一些协商,或者简单地现在可以在同一连接中继续,或者必须通过隧道通过其他一些协议(使用包装流)进行真正的连接。相反,我们想让其他一些套接字工厂执行原始连接,然后只执行流包装。

RMI 为客户端和服务器套接字工厂提供单独的接口。客户端套接字工厂将被序列化并与远程存根一起从服务器传递到客户端,从而允许客户端访问服务器。

还有一个 RMISocketFactory 抽象类实现这两个接口,并提供一个 VM 全局默认套接字工厂,该工厂将用于所有没有自己的远程对象。

我们现在将实现该类的子类(从而也实现两个接口),允许用户提供一个基本的客户端和服务器套接字工厂,然后我们将使用它。我们的类必须是可序列化的,以便将其传递给客户端。

/**
 * A base class for RMI socket factories which do their
 * work by wrapping the streams of Sockets from another
 * Socket factory.
 *
 * Subclasses have to overwrite the {@link #wrap} method.
 *
 * Instances of this class can be used as both client and
 * server socket factories, or as only one of them.
 */
public abstract class WrappingSocketFactory 
    extends RMISocketFactory
    implements Serializable
{

(想象一下所有其余的都相对于这个类缩进。)

因为我们想引用其他工厂,所以这里是字段。

/**
 * The base client socket factory. This will be serialized.
 */
private RMIClientSocketFactory baseCFactory;

/**
 * The base server socket factory. This will not be serialized,
 * since the server socket factory is used only on the server side.
 */
private transient RMIServerSocketFactory baseSFactory;

这些将由简单的构造函数初始化(我在这里不再重复 - 查看 github 存储库以获取完整代码)。

抽象 wrap 方法

为了让这种“套接字工厂的包装”具有通用性,我们在这里只执行通用机制,并在子类中执行流的实际包装。然后我们可以有一个压缩/解压的子类,一个加密的子类,一个日志记录的子类等等。

这里我们只声明了wrap方法:

/**
 * Wraps a pair of streams.
 * Subclasses must implement this method to do the actual
 * work.
 * @param input the input stream from the base socket.
 * @param output the output stream to the base socket.
 * @param server if true, we are constructing a socket in
 *    {@link ServerSocket#accept}. If false, this is a pure
 *   client socket.
 */
protected abstract StreamPair wrap(InputStream input,
                                   OutputStream output,
                                   boolean server);

这个方法(事实上Java不允许有多个返回值) ) 是 StreamPair 类的原因。或者,我们可以有两个单独的方法,但在某些情况下(对于 SSL)有必要知道哪两个流是配对的。

客户端套接字工厂

现在,让我们看一下客户端套接字工厂的实现:

/**
 * Creates a client socket and connects it to the given host/port pair.
 *
 * This retrieves a socket to the host/port from the base client
 * socket factory and then wraps a new socket (with a custom SocketImpl)
 * around it.
 * @param host the host we want to be connected with.
 * @param port the port we want to be connected with.
 * @return a new Socket connected to the host/port pair.
 * @throws IOException if something goes wrong.
 */
public Socket createSocket(String host, int port)
    throws IOException
{
    Socket baseSocket = baseCFactory.createSocket(host, port);

我们从基础工厂检索套接字,然后

    StreamPair streams = this.wrap(baseSocket.getInputStream(),
                                   baseSocket.getOutputStream(),
                                   false);

……用新流包装其流。 (这个wrap必须由子类实现,见下文)。

    SocketImpl wrappingImpl = new WrappingSocketImpl(streams, baseSocket);

然后我们使用这些流来创建我们的WrappingSocketImpl(见上文),并将其传递

    return new Socket(wrappingImpl) {
        public boolean isConnected() { return true; }
    };

给……一个新的Socket。我们必须子类化 Socket 因为这个构造函数是受保护的,但这是合适的,因为我们还必须重写 isConnected 方法来返回 true 而不是。 (记住,我们的 SocketImpl 已经连接了,并且不支持连接。)

}

对于客户端套接字工厂来说,这已经足够了。对于服务器套接字工厂来说,情况会变得更复杂一些。

包装 ServerSockets

似乎没有办法用给定的 SocketImpl 对象创建 ServerSocket - 它总是使用静态 SocketImplFactory。因此,我们现在对 ServerSocket 进行子类化,只是忽略它的 SocketImpl,而是委托给另一个 ServerSocket。

/**
 * A server socket subclass which wraps our custom sockets around the
 * sockets retrieves by a base server socket.
 *
 * We only override enough methods to work. Basically, this is
 * a unbound server socket, which handles {@link #accept} specially.
 */
private class WrappingServerSocket extends ServerSocket {
    private ServerSocket base;

    public WrappingServerSocket(ServerSocket b)
        throws IOException
    {
        this.base = b;
    }

事实证明,我们必须实现这个 getLocalPort,因为这个号码是通过远程存根发送到客户端的。

    /**
     * returns the local port this ServerSocket is bound to.
     */
    public int getLocalPort() {
        return base.getLocalPort();
    }

下一个方法是重要的方法。它的工作原理与上面的 createSocket() 方法类似。

    /**
     * accepts a connection from some remote host.
     * This will accept a socket from the base socket, and then
     * wrap a new custom socket around it.
     */
    public Socket accept() throws IOException {

我们让基础 ServerSocket 接受一个连接,然后包装它的流:

        final Socket baseSocket = base.accept();
        StreamPair streams =
            WrappingSocketFactory.this.wrap(baseSocket.getInputStream(),
                                            baseSocket.getOutputStream(),
                                            true);

然后我们创建 WrappingSocketImpl,

        SocketImpl wrappingImpl =
            new WrappingSocketImpl(streams, baseSocket);

……并创建 Socket 的另一个匿名子类:

        // For some reason, this seems to work only as a
        // anonymous direct subclass of Socket, not as a
        // external subclass.      Strange.
        Socket result = new Socket(wrappingImpl) {
                public boolean isConnected() { return true; }
                public boolean isBound() { return true; }
                public int getLocalPort() {
                    return baseSocket.getLocalPort();
                }
                public InetAddress getLocalAddress() {
                    return baseSocket.getLocalAddress();
                }
            };

这个需要一些更多重写的方法,因为这些方法由 RMI 引擎调用, 它似乎。

我尝试将它们放在一个单独的(非本地)类中,但这不起作用(在连接时客户端出现异常)。我不知道为什么。如果有人有想法,我很感兴趣。

        return result;
    }
}

有了这个ServerSocket子类,我们就可以完成我们的...

包装RMI服务器套接字工厂

/**
 * Creates a server socket listening on the given port.
 *
 * This retrieves a ServerSocket listening on the given port
 * from the base server socket factory, and then creates a 
 * custom server socket, which on {@link ServerSocket#accept accept}
 * wraps new Sockets (with a custom SocketImpl) around the sockets
 * from the base server socket.
 * @param host the host we want to be connected with.
 * @param port the port we want to be connected with.
 * @return a new Socket connected to the host/port pair.
 * @throws IOException if something goes wrong.
 */
public ServerSocket createServerSocket(int port)
    throws IOException
{
    final ServerSocket baseSocket = getSSFac().createServerSocket(port);
    ServerSocket ss = new WrappingServerSocket(baseSocket);
    return ss;
}

话不多说,都已经在注释中了。是的,我知道我可以在一行中完成这一切。 (线路之间最初有一些调试输出。)

让我们完成课程:

}

下次:跟踪套接字工厂。


跟踪套接字工厂。

为了测试我们的包装并查看是否有足够的刷新,这里使用第一个子类的 wrap 方法:

protected StreamPair wrap(InputStream in, OutputStream out, boolean server)
{
    InputStream wrappedIn = in;
    OutputStream wrappedOut = new FilterOutputStream(out) {
            public void write(int b) throws IOException {
                System.err.println("write(.)");
                super.write(b);
            }
            public void write(byte[] b, int off, int len)
                throws IOException {
                System.err.println("write(" + len + ")");
                super.out.write(b, off, len);
            }
            public void flush() throws IOException {
                System.err.println("flush()");
                super.flush();
            }
        };
    return new StreamPair(wrappedIn, wrappedOut);
}

输入流按原样使用,输出流仅添加一些日志记录。

在服务器端,它看起来像这样([example] 来自ant):

  [example] write(14)
  [example] flush()
  [example] write(287)
  [example] flush()
  [example] flush()
  [example] flush()
  [example] write(1)
  [example] flush()
  [example] write(425)
  [example] flush()
  [example] flush()

我们看到有足够的刷新,甚至绰绰有余。 (数字是输出块的长度。)
(在客户端,这实际上抛出了 java.rmi.NoSuchObjectException。它之前工作过......不知道为什么它现在不起作用。由于压缩示例确实工作并且我累了,我不会搜索它现在。)

下一步:压缩。


刷新压缩流

为了进行压缩,Java 在 java.util.zip 包中提供了一些类。有一对 DeflaterOutputStream / InflaterInputStream,它们通过包装另一个流并通过 Deflater 过滤数据来实现 deflate 压缩算法分别是 Inflater。 Deflater 和 Inflater 基于调用通用 zlib 库的本机方法。 (实际上,如果有人提供带有 DeflaterInflater 的替代实现的子类,这些流还可以支持其他算法。)

(还有 DeflaterInputStream 和 InflaterOutputStream,它们分别工作)

基于此,GZipOutputStreamGZipInputStream 实现了 GZip 文件格式。 (这主要添加一些页眉和页脚以及校验和。)

两个输出流有问题(对于我们的用例)他们并不真正支持flush()。这是由 Deflater 的 API 定义中的缺陷引起的,Deflater 允许在最终 finish() 之前缓冲尽可能多的数据。 Zlib 允许刷新其状态,只是 Java 包装器太愚蠢了。

自 1999 年 1 月以来,就有关于此问题的 bug #4206909 开放,看起来像它终于在 Java 7 上得到了修复,万岁!如果您有 Java 7,则可以在此处简单地使用 DeflaterOutputStream。

由于我还没有 Java 7,因此我将使用 rsaddey 在 2002 年 6 月 23 日的错误评论中发布的解决方法。

/**
 * Workaround für kaputten GZipOutputStream, von
 * https://bugs.java.com/bugdatabase/view_bug?bug_id=4206909
 * (23-JUN-2002, rsaddey)
 * @see DecompressingInputStream
 */
public class CompressingOutputStream
    extends DeflaterOutputStream {


    public CompressingOutputStream (final OutputStream out)
    {
        super(out,
              // Using Deflater with nowrap == true will ommit headers
              //  and trailers
              new Deflater(Deflater.DEFAULT_COMPRESSION, true));
    }

    private static final byte [] EMPTYBYTEARRAY = new byte[0];
    /**
     * Insure all remaining data will be output.
     */
    public void flush() throws IOException {
        /**
         * Now this is tricky: We force the Deflater to flush
         * its data by switching compression level.
         * As yet, a perplexingly simple workaround for 
         *  http://developer.java.sun.com/developer/bugParade/bugs/4255743.html 
        */
        def.setInput(EMPTYBYTEARRAY, 0, 0);

        def.setLevel(Deflater.NO_COMPRESSION);
        deflate();

        def.setLevel(Deflater.DEFAULT_COMPRESSION);
        deflate();

        out.flush();
    }

    /**
     * Wir schließen auch den (selbst erstellten) Deflater, wenn
     * wir fertig sind.
     */
    public void close()
        throws IOException
    {
        super.close();
        def.end();
    }

} // class

/**
 * Workaround für kaputten GZipOutputStream, von
 * https://bugs.java.com/bugdatabase/view_bug?bug_id=4206909
 * (23-JUN-2002, rsaddey)
 * @see CompressingOutputStream
 */
public class DecompressingInputStream extends InflaterInputStream {

    public DecompressingInputStream (final InputStream in) {
        // Using Inflater with nowrap == true will ommit headers and trailers
        super(in, new Inflater(true));
    }

    /**
     * available() should return the number of bytes that can be read without
     * running into blocking wait. Accomplishing this feast would eventually
     * require to pre-inflate a huge chunk of data, so we rather opt for a
     * more relaxed contract (java.util.zip.InflaterInputStream does not 
     * fit the bill). 
     * This code has been tested to work with BufferedReader.readLine();
     */
    public int available() throws IOException {
        if (!inf.finished() && !inf.needsInput()) {
            return 1;
        } else {
            return in.available();
        }
    }

    /**
     * Wir schließen auch den (selbst erstellten) Inflater, wenn
     * wir fertig sind.
     */
    public void close()
        throws IOException
    {
        super.close();
        inf.end();
    }

} //class

(这些是 de.fencing_game 中。 Tools 包在我的 github 存储库中。)它有一些德语注释,因为我一年前最初将其复制到我的另一个项目中。)

在 Stackoverflow 上搜索了一下,我发现 BalusC 对相关问题的回答,它提供了另一个压缩输出流,并经过优化冲洗。我没有测试这个,但它可能是这个的替代方案。 (它使用 gzip 格式,而我们在这里使用纯 deflate 格式。确保写入和读取流适合在一起。)

另一种选择是使用 JZlib,正如bestsss建议的那样,它是ZOutputStream和ZInputStream。 它没有太多文档,但我正在研究它。

下次:压缩 RMI 套接字工厂


压缩 RMI 套接字工厂

现在我们可以将它们整合在一起。

/**
 * An RMISocketFactory which enables compressed transmission.
 * We use {@link #CompressingInputStream} and {@link #CompressingOutputStream}
 * for this.
 *
 * As we extend WrappingSocketFactory, this can be used on top of another
 * {@link RMISocketFactory}.
 */
public class CompressedRMISocketFactory
    extends WrappingSocketFactory
{

    private static final long serialVersionUID = 1;

    //------------ Constructors -----------------

    /**
     * Creates a CompressedRMISocketFactory based on a pair of
     * socket factories.
     *
     * @param cFac the base socket factory used for creating client
     *   sockets. This may be {@code null}, then we will use the
     *  {@linkplain RMISocketFactory#getDefault() default socket factory}
     *  of client system where this object is finally used for
     *   creating sockets.
     *   If not null, it should be serializable.
     * @param sFac the base socket factory used for creating server
     *   sockets. This may be {@code null}, then we will use the
     *  {@linkplain RMISocketFactory#getDefault() default RMI Socket factory}.
     *  This will not be serialized to the client.
     */
    public CompressedRMISocketFactory(RMIClientSocketFactory cFac,
                                      RMIServerSocketFactory sFac) {
        super(cFac, sFac);
    }

    // [snipped more constructors]

    //-------------- Implementation -------------

    /**
     * wraps a pair of streams into compressing/decompressing streams.
     */
    protected StreamPair wrap(InputStream in, OutputStream out,
                              boolean server)
    {
        return new StreamPair(new DecompressingInputStream(in),
                              new CompressingOutputStream(out));
    }
}

就是这样。我们现在将此工厂对象提供给 UnicastRemoteObject.export(...) 作为参数(客户端和服务器工厂),并且所有通信都将被压缩。 (我的 github 存储库中的版本< /a> 有一个带有示例的 main 方法。)

当然,在 RMI 之类的事情之前,压缩的好处不会很大,至少当您不传输大字符串或类似的东西作为参数或返回值时。

下次(在我睡了之后):与 SSL 套接字工厂结合。


与 SSL 套接字工厂结合

如果我们使用默认类,Java 部分就很容易:

CompressedRMISocketFactory fac =
    new CompressedRMISocketFactory(new SslRMIClientSocketFactory(),
                   new SslRMIServerSocketFactory());

这些类(在 javax.rmi.ssl 中)使用默认的 SSLSocketFactory 和 SSLServerSocketFactory(在 javax.net.ssl 中),它们使用系统的默认密钥库和信任库。

因此,有必要创建一个带有密钥对的密钥存储(例如通过keytool -genkeypair -v),并使用系统属性javax.net.ssl.keyStore<将其提供给VM /code> (密钥存储的文件名)和 javax.net.ssl.keyStorePassword (密钥存储的密码)。

在客户端,我们需要一个信任存储 - 即包含公钥的密钥存储,或者签署服务器公钥的某个证书。出于测试目的,我们只需使用与服务器相同的密钥库,对于生产,您当然不希望服务器的私钥位于客户端。我们为其提供属性 javax.net.ssl.trustStore javax.net.ssl.trustStorePassword

然后它就变成了这样(在服务器端):

    Remote server =
        UnicastRemoteObject.exportObject(new EchoServerImpl(),
                                         0, fac, fac);
    System.err.println("server: " + server);

    Registry registry =
        LocateRegistry.createRegistry(Registry.REGISTRY_PORT);
    
    registry.bind("echo", server);

对于前面的示例,客户端是一个普通客户端:

    Registry registry =
        LocateRegistry.getRegistry("localhost",
                                   Registry.REGISTRY_PORT);
    
    EchoServer es = (EchoServer)registry.lookup("echo");
    System.err.println("es: " + es);
    System.out.println(es.echo("hallo"));

现在与 EchoServer 的所有通信都经过压缩和加密。
当然,为了完全安全,我们还希望与注册表的通信受到 SSL 保护,以避免任何中间人攻击(这还允许通过向客户端提供伪造的 RMIClientSocketFactory 或伪造的 RMIClientSocketFactory 来拦截与 EchoServer 的通信)服务器地址)。


We have several problems here:

  • We can't simply wrap SocketFactories around each other, like we can do for InputStreams and OutputStreams.
  • Java's zlib-based DeflatorOutputStream does not implement flushing.

I think I found a mechanism how this would seems to work.

This will be a some-part series, as it needs some time to write.
(You can find the source code of the completed stuff in my github repository).

A custom SocketImpl

A Socket always is based by an object implementing SocketImpl. Thus, having a custom socket in fact means using a custom SocketImpl class. Here is an implementation based on a pair of streams (and a base socket, for closing purposes):

/**
 * A SocketImpl implementation which works on a pair
 * of streams.
 *
 * A instance of this class represents an already
 * connected socket, thus all the methods relating to
 * connecting, accepting and such are not implemented.
 *
 * The implemented methods are {@link #getInputStream},
 * {@link #getOutputStream}, {@link #available} and the
 * shutdown methods {@link #close}, {@link #shutdownInput},
 * {@link #shutdownOutput}.
 */
private static class WrappingSocketImpl extends SocketImpl {
    private InputStream inStream;
    private OutputStream outStream;

    private Socket base;
    
    WrappingSocketImpl(StreamPair pair, Socket base) {
        this.inStream = pair.input;
        this.outStream = pair.output;
        this.base = base;
    }

A StreamPair is a simple data holder class, see below.

These are the important methods:

    protected InputStream getInputStream() {
        return inStream;
    }

    protected OutputStream getOutputStream() {
        return outStream;
    }

    protected int available() throws IOException {
        return inStream.available();
    }

Then some methods to allow closing. These are not really tested (maybe we should also close or at least flush the streams?), but it seems to work for our RMI usage.

    protected void close() throws IOException {
        base.close();
    }

    protected void shutdownInput() throws IOException {
        base.shutdownInput();
        // TODO: inStream.close() ?
    }

    protected void shutdownOutput() throws IOException {
        base.shutdownOutput();
        // TODO: outStream.close()?
    }

The next some methods will be called by the Socket constructor (or indirectly by something in the RMI engine), but don't really have to do anything.

    protected void create(boolean stream) {
        if(!stream) {
            throw new IllegalArgumentException("datagram socket not supported.");
        }
    }

    public Object getOption(int optID) {
        System.err.println("getOption(" + optID + ")");
        return null;
    }

    public void setOption(int optID, Object value) {
        // noop, as we don't have any options.
    }

All the remaining methods are not necessary, we implement them throwing Exceptions (so we will notice if this assumption was wrong).

    // unsupported operations

    protected void connect(String host, int port) {
        System.err.println("connect(" + host + ", " + port + ")");
        throw new UnsupportedOperationException();
    }


    protected void connect(InetAddress address, int port) {
        System.err.println("connect(" + address + ", " + port + ")");
        throw new UnsupportedOperationException();
    }

    protected void connect(SocketAddress addr, int timeout) {
        System.err.println("connect(" + addr + ", " + timeout + ")");
        throw new UnsupportedOperationException();
    }

    protected void bind(InetAddress host, int port) {
        System.err.println("bind(" + host + ", " + port + ")");
        throw new UnsupportedOperationException();
    }

    protected void listen(int backlog) {
        System.err.println("listen(" + backlog + ")");
        throw new UnsupportedOperationException();
    }

    protected void accept(SocketImpl otherSide) {
        System.err.println("accept(" + otherSide + ")");
        throw new UnsupportedOperationException();
    }

    protected void sendUrgentData(int data) {
        System.err.println("sendUrgentData()");
        throw new UnsupportedOperationException();
    }
}

Here is the StreamPair used by the constructor:

/**
 * A simple holder class for a pair of streams.
 */
public static class StreamPair {
    public InputStream input;
    public OutputStream output;
    public StreamPair(InputStream in, OutputStream out) {
        this.input = in; this.output = out;
    }
}

Next part: use this to implement a Socket factory.


A Socket factory, wrapping another one.

We are dealing here with RMI socket factories (i.e. RMIClientSocketFactory, RMIServerSocketFactory, RMISocketFactory in java.rmi.server), but the same idea applies to other libraries using a socket factory interface as well. Examples are javax.net.SocketFactory (and ServerSocketFactory), Apache Axis' SocketFactory, JSch's SocketFactory.

Often, the idea of these factories is that they somehow connect to another server than the original one (a proxy), then do some negotiating and either simple can continue now in the same connection or have to tunnel the real connection through some other protocol (using wrapping streams). We instead want to let some other socket factory do the original connecting, and then do only the stream wrapping ourselves.

RMI has separate interfaces for the client and server socket factories. The client socket factories will be serialized and passed from the server to the client together with the remote stubs, allowing the client to reach the server.

There is also a RMISocketFactory abstract class implementing both interfaces, and providing a VM-global default socket factory which will be used for all remote objects which don't have their own ones.

We will now implement a subclass of this class (and thereby also implementing both interfaces), allowing the user to give a base client and server socket factory, which we then will use. Our class must be serializable to allow passing it to the clients.

/**
 * A base class for RMI socket factories which do their
 * work by wrapping the streams of Sockets from another
 * Socket factory.
 *
 * Subclasses have to overwrite the {@link #wrap} method.
 *
 * Instances of this class can be used as both client and
 * server socket factories, or as only one of them.
 */
public abstract class WrappingSocketFactory 
    extends RMISocketFactory
    implements Serializable
{

(Imagine all the rest indented relative to this class.)

As we want to refer to other factories, here the fields.

/**
 * The base client socket factory. This will be serialized.
 */
private RMIClientSocketFactory baseCFactory;

/**
 * The base server socket factory. This will not be serialized,
 * since the server socket factory is used only on the server side.
 */
private transient RMIServerSocketFactory baseSFactory;

These will be initialized by straightforward constructors (which I don't repeat here - look at the github repository for the full code).

Abstract wrap method

To let this "wrapping of socket factories" be general, we do only the general mechanism here, and do the actual wrapping of the streams in subclasses. Then we can have a compressing/decompressing subclass, a encrypting one, a logging one, etc.

Here we only declare the wrap method:

/**
 * Wraps a pair of streams.
 * Subclasses must implement this method to do the actual
 * work.
 * @param input the input stream from the base socket.
 * @param output the output stream to the base socket.
 * @param server if true, we are constructing a socket in
 *    {@link ServerSocket#accept}. If false, this is a pure
 *   client socket.
 */
protected abstract StreamPair wrap(InputStream input,
                                   OutputStream output,
                                   boolean server);

This method (and the fact that Java doesn't allow multiple return values) is the reason for the StreamPair class. Alternatively we could have two separate methods, but in some cases (as for SSL) it is necessary to know which two streams are paired.

Client Socket Factory

Now, lets have a look at the client socket factory implementation:

/**
 * Creates a client socket and connects it to the given host/port pair.
 *
 * This retrieves a socket to the host/port from the base client
 * socket factory and then wraps a new socket (with a custom SocketImpl)
 * around it.
 * @param host the host we want to be connected with.
 * @param port the port we want to be connected with.
 * @return a new Socket connected to the host/port pair.
 * @throws IOException if something goes wrong.
 */
public Socket createSocket(String host, int port)
    throws IOException
{
    Socket baseSocket = baseCFactory.createSocket(host, port);

We retrieve a socket from our base factory, and then ...

    StreamPair streams = this.wrap(baseSocket.getInputStream(),
                                   baseSocket.getOutputStream(),
                                   false);

... wrap its streams by new streams. (This wrap has to be implemented by subclasses, see below).

    SocketImpl wrappingImpl = new WrappingSocketImpl(streams, baseSocket);

Then we use these streams to create our WrappingSocketImpl (see above), and pass it ...

    return new Socket(wrappingImpl) {
        public boolean isConnected() { return true; }
    };

... to a new Socket. We have to subclass Socket because this constructor is protected, but this is opportune since we also have to override the isConnected method to return true instead of false. (Remember, our SocketImpl is already connected, and does not support connecting.)

}

For client socket factories, this is already enough. For server socket factories, it gets a bit more complicated.

Wrapping ServerSockets

There seems to be no way to create a ServerSocket with a given SocketImpl object - it always uses the static SocketImplFactory. Thus we now subclass ServerSocket, simply ignoring its SocketImpl, instead delegating to another ServerSocket.

/**
 * A server socket subclass which wraps our custom sockets around the
 * sockets retrieves by a base server socket.
 *
 * We only override enough methods to work. Basically, this is
 * a unbound server socket, which handles {@link #accept} specially.
 */
private class WrappingServerSocket extends ServerSocket {
    private ServerSocket base;

    public WrappingServerSocket(ServerSocket b)
        throws IOException
    {
        this.base = b;
    }

It turns out we have to implement this getLocalPort, since this number is sent with the remote stub to the clients.

    /**
     * returns the local port this ServerSocket is bound to.
     */
    public int getLocalPort() {
        return base.getLocalPort();
    }

The next method is the important one. It works similar to our createSocket() method above.

    /**
     * accepts a connection from some remote host.
     * This will accept a socket from the base socket, and then
     * wrap a new custom socket around it.
     */
    public Socket accept() throws IOException {

We let the base ServerSocket accept a connection, then wrap its streams:

        final Socket baseSocket = base.accept();
        StreamPair streams =
            WrappingSocketFactory.this.wrap(baseSocket.getInputStream(),
                                            baseSocket.getOutputStream(),
                                            true);

Then we create our WrappingSocketImpl, ...

        SocketImpl wrappingImpl =
            new WrappingSocketImpl(streams, baseSocket);

... and create another anonymous subclass of Socket:

        // For some reason, this seems to work only as a
        // anonymous direct subclass of Socket, not as a
        // external subclass.      Strange.
        Socket result = new Socket(wrappingImpl) {
                public boolean isConnected() { return true; }
                public boolean isBound() { return true; }
                public int getLocalPort() {
                    return baseSocket.getLocalPort();
                }
                public InetAddress getLocalAddress() {
                    return baseSocket.getLocalAddress();
                }
            };

This one needs some more overridden methods, as these are called by the RMI engine, it seems.

I tried to put these in a separate (non-local) class, but this did not work (gave exceptions at the client side on connecting). I have no idea why. If someone has an idea, I'm interested.

        return result;
    }
}

Having this ServerSocket subclass, we can complete our ...

wrapping RMI server socket factory

/**
 * Creates a server socket listening on the given port.
 *
 * This retrieves a ServerSocket listening on the given port
 * from the base server socket factory, and then creates a 
 * custom server socket, which on {@link ServerSocket#accept accept}
 * wraps new Sockets (with a custom SocketImpl) around the sockets
 * from the base server socket.
 * @param host the host we want to be connected with.
 * @param port the port we want to be connected with.
 * @return a new Socket connected to the host/port pair.
 * @throws IOException if something goes wrong.
 */
public ServerSocket createServerSocket(int port)
    throws IOException
{
    final ServerSocket baseSocket = getSSFac().createServerSocket(port);
    ServerSocket ss = new WrappingServerSocket(baseSocket);
    return ss;
}

Not much to say, it all is already in the comment. Yes, I know I could do this all in one line. (There originally were some debugging outputs between the lines.)

Let's finish the class:

}

Next time: a tracing socket factory.


A tracing socket factory.

To test our wrapping and see if there are enough flushes, here the wrap method of a first subclass:

protected StreamPair wrap(InputStream in, OutputStream out, boolean server)
{
    InputStream wrappedIn = in;
    OutputStream wrappedOut = new FilterOutputStream(out) {
            public void write(int b) throws IOException {
                System.err.println("write(.)");
                super.write(b);
            }
            public void write(byte[] b, int off, int len)
                throws IOException {
                System.err.println("write(" + len + ")");
                super.out.write(b, off, len);
            }
            public void flush() throws IOException {
                System.err.println("flush()");
                super.flush();
            }
        };
    return new StreamPair(wrappedIn, wrappedOut);
}

The input stream is used as is, the output stream simply adds some logging.

On the server side, it looks like this (the [example] comes from ant):

  [example] write(14)
  [example] flush()
  [example] write(287)
  [example] flush()
  [example] flush()
  [example] flush()
  [example] write(1)
  [example] flush()
  [example] write(425)
  [example] flush()
  [example] flush()

We see that there are enough flushes, even more than enough. (The numbers are the lengths of the output chunks.)
(On client side, this actually throws a java.rmi.NoSuchObjectException. It worked before ... no idea why it doesn't work now. As the compressing example does work and I'm tired, I'll not search for it now.)

Next: compressing.


Flushing compressed streams

For compression, Java has some classes in the java.util.zip package. There is the pair DeflaterOutputStream / InflaterInputStream which implement the deflate compression algorithm by wrapping another stream, filtering the data through a Deflater or Inflater, respectively. Deflater and Inflater are based on native methods calling the common zlib library. (Actually, the streams could also support other algorithms, if someone provided subclasses with alternate implementations of Deflater and Inflater.)

(There are also DeflaterInputStream and InflaterOutputStream, which work the other way around.)

Based on this, GZipOutputStream and GZipInputStream implement the GZip file format. (This adds mainly some header and footer, and a checksum.)

Both output streams have the problem (for our use case) that they don't truly support flush(). This is caused by a deficiency in the API definition of Deflater, which is allowed to buffer as much data as its want until the final finish(). Zlib allows flushing its state, just the Java wrapper is too stupid.

There is bug #4206909 open about this since January 1999, and it looks like it is finally fixed for Java 7, hurray! If you have Java 7, you can simply use DeflaterOutputStream here.

Since I don't have Java 7, yet, I'll use the workaround posted in the bug comments on 23-JUN-2002 by rsaddey.

/**
 * Workaround für kaputten GZipOutputStream, von
 * https://bugs.java.com/bugdatabase/view_bug?bug_id=4206909
 * (23-JUN-2002, rsaddey)
 * @see DecompressingInputStream
 */
public class CompressingOutputStream
    extends DeflaterOutputStream {


    public CompressingOutputStream (final OutputStream out)
    {
        super(out,
              // Using Deflater with nowrap == true will ommit headers
              //  and trailers
              new Deflater(Deflater.DEFAULT_COMPRESSION, true));
    }

    private static final byte [] EMPTYBYTEARRAY = new byte[0];
    /**
     * Insure all remaining data will be output.
     */
    public void flush() throws IOException {
        /**
         * Now this is tricky: We force the Deflater to flush
         * its data by switching compression level.
         * As yet, a perplexingly simple workaround for 
         *  http://developer.java.sun.com/developer/bugParade/bugs/4255743.html 
        */
        def.setInput(EMPTYBYTEARRAY, 0, 0);

        def.setLevel(Deflater.NO_COMPRESSION);
        deflate();

        def.setLevel(Deflater.DEFAULT_COMPRESSION);
        deflate();

        out.flush();
    }

    /**
     * Wir schließen auch den (selbst erstellten) Deflater, wenn
     * wir fertig sind.
     */
    public void close()
        throws IOException
    {
        super.close();
        def.end();
    }

} // class

/**
 * Workaround für kaputten GZipOutputStream, von
 * https://bugs.java.com/bugdatabase/view_bug?bug_id=4206909
 * (23-JUN-2002, rsaddey)
 * @see CompressingOutputStream
 */
public class DecompressingInputStream extends InflaterInputStream {

    public DecompressingInputStream (final InputStream in) {
        // Using Inflater with nowrap == true will ommit headers and trailers
        super(in, new Inflater(true));
    }

    /**
     * available() should return the number of bytes that can be read without
     * running into blocking wait. Accomplishing this feast would eventually
     * require to pre-inflate a huge chunk of data, so we rather opt for a
     * more relaxed contract (java.util.zip.InflaterInputStream does not 
     * fit the bill). 
     * This code has been tested to work with BufferedReader.readLine();
     */
    public int available() throws IOException {
        if (!inf.finished() && !inf.needsInput()) {
            return 1;
        } else {
            return in.available();
        }
    }

    /**
     * Wir schließen auch den (selbst erstellten) Inflater, wenn
     * wir fertig sind.
     */
    public void close()
        throws IOException
    {
        super.close();
        inf.end();
    }

} //class

(These are in the de.fencing_game.tools package in my github repository.) It has some German comments since I originally one year ago copied this for another project of mine.)

Searching a bit on Stackoverflow I found this answer by BalusC to a related question, which offers another compressing Outputstream, with optimized flushing. I did not test this, but it might be an alternative to this one. (It uses gzip format, while we are using the pure deflate format here. Make sure both writing and reading stream fit together.)

Another alternative would be using JZlib, as bestsss proposed, with it's ZOutputStream and ZInputStream. It has not much documentation, but I'm working on it.

Next time: compressed RMI socket factory


Compressing RMI socket factory

Now we can pull it all together.

/**
 * An RMISocketFactory which enables compressed transmission.
 * We use {@link #CompressingInputStream} and {@link #CompressingOutputStream}
 * for this.
 *
 * As we extend WrappingSocketFactory, this can be used on top of another
 * {@link RMISocketFactory}.
 */
public class CompressedRMISocketFactory
    extends WrappingSocketFactory
{

    private static final long serialVersionUID = 1;

    //------------ Constructors -----------------

    /**
     * Creates a CompressedRMISocketFactory based on a pair of
     * socket factories.
     *
     * @param cFac the base socket factory used for creating client
     *   sockets. This may be {@code null}, then we will use the
     *  {@linkplain RMISocketFactory#getDefault() default socket factory}
     *  of client system where this object is finally used for
     *   creating sockets.
     *   If not null, it should be serializable.
     * @param sFac the base socket factory used for creating server
     *   sockets. This may be {@code null}, then we will use the
     *  {@linkplain RMISocketFactory#getDefault() default RMI Socket factory}.
     *  This will not be serialized to the client.
     */
    public CompressedRMISocketFactory(RMIClientSocketFactory cFac,
                                      RMIServerSocketFactory sFac) {
        super(cFac, sFac);
    }

    // [snipped more constructors]

    //-------------- Implementation -------------

    /**
     * wraps a pair of streams into compressing/decompressing streams.
     */
    protected StreamPair wrap(InputStream in, OutputStream out,
                              boolean server)
    {
        return new StreamPair(new DecompressingInputStream(in),
                              new CompressingOutputStream(out));
    }
}

That's it. We now provide this factory object to UnicastRemoteObject.export(...) as arguments (both for client and server factory), and all the communication will be compressed. (The version in my github repository has a main method with an example.)

Of course, the compression benefits will not be huge fore things like RMI, at least when you don't transfer large strings or similar stuff as arguments or return values.

Next time (after I have slept): combining with an SSL socket factory.


Combining with an SSL socket factory

The Java part of this is easy, if we use the default classes:

CompressedRMISocketFactory fac =
    new CompressedRMISocketFactory(new SslRMIClientSocketFactory(),
                   new SslRMIServerSocketFactory());

These classes (in javax.rmi.ssl) use the default SSLSocketFactory and SSLServerSocketFactory (in javax.net.ssl), which use the system's default keystore and trust store.

Thus it is necessary to create a key store with keypair (for example by keytool -genkeypair -v), and provide this to the VM with the system properties javax.net.ssl.keyStore (the file name for the key store) and javax.net.ssl.keyStorePassword (the password for the key store).

On the client side, we need a trust store - i.e. a key store containing the public keys, or some certificate which signed the public keys of the server. For testing purposes, we simply can use the same keystore as the server, for production you certainly would not want the server's private key on the client side. We provide this with the properties javax.net.ssl.trustStore javax.net.ssl.trustStorePassword.

Then it gets down to this (on the server side):

    Remote server =
        UnicastRemoteObject.exportObject(new EchoServerImpl(),
                                         0, fac, fac);
    System.err.println("server: " + server);

    Registry registry =
        LocateRegistry.createRegistry(Registry.REGISTRY_PORT);
    
    registry.bind("echo", server);

The client is a stock client as for the previous examples:

    Registry registry =
        LocateRegistry.getRegistry("localhost",
                                   Registry.REGISTRY_PORT);
    
    EchoServer es = (EchoServer)registry.lookup("echo");
    System.err.println("es: " + es);
    System.out.println(es.echo("hallo"));

Now all communication to the EchoServer runs compressed and encrypted.
Of course, for complete security we also would want the communication to the registry SSL-protected, to avoid any man-in-the-middle attacks (which would allow also intercepting communication to the EchoServer by giving the client a fake RMIClientSocketFactory, or fake server address).


~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文