将 scala 延续与 netty/NIO 监听器结合使用
我正在使用 Netty 库(来自 GitHub 的版本 4)。它在 Scala 中工作得很好,但我希望我的库能够使用连续传递样式来进行异步等待。
传统上,使用 Netty,您会执行以下操作(异步连接操作示例):
//client is a ClientBootstrap
val future:ChannelFuture = client.connect(remoteAddr);
future.addListener(new ChannelFutureListener {
def operationComplete (f:ChannelFuture) = {
//here goes the code that happens when the connection is made
}
})
如果您正在实现一个库(我就是),那么您基本上有三个简单的选项来允许库的用户在连接建立后执行操作:
- 只需从 connect 方法返回 ChannelFuture 并让用户处理它 - 这并没有提供来自 netty 的太多抽象。
- 将 ChannelFutureListener 作为 connect 方法的参数并将其添加为 ChannelFuture 的侦听器。
- 将回调函数对象作为 connect 方法的参数,并从您创建的 ChannelFutureListener 中调用它(这将形成回调驱动的风格,有点像 node.js)。
我想做的是第四个选项;我没有将其包括在上面的计数中,因为它并不简单。
我想使用 scala 分隔延续来使库的使用有点像阻塞库,但它将在幕后非阻塞:
class MyLibraryClient {
def connect(remoteAddr:SocketAddress) = {
shift { retrn: (Unit => Unit) => {
val future:ChannelFuture = client.connect(remoteAddr);
future.addListener(new ChannelFutureListener {
def operationComplete(f:ChannelFuture) = {
retrn();
}
});
}
}
}
}
想象一下以相同方式实现的其他读/写操作。这样做的目的是让用户的代码看起来更像这样:
reset {
val conn = new MyLibraryClient();
conn.connect(new InetSocketAddress("127.0.0.1", 1337));
println("This will happen after the connection is finished");
}
换句话说,程序看起来像一个简单的阻塞式程序,但在幕后不会有任何阻塞或线程。
我遇到的麻烦是我不完全理解分隔延续的类型是如何工作的。当我尝试以上述方式实现它时,编译器抱怨我的 operationComplete
实现实际上返回 Unit @scala.util.continuations.cpsParam[Unit,Unit =>单位]
而不是单位
。我发现 scala 的 CPS 中存在某种“陷阱”,因为您必须使用 @suspendable
注释 shift
方法的返回类型,该类型会在调用堆栈中向上传递,直到reset
,但似乎没有任何方法可以将其与没有分隔延续概念的现有 Java 库相协调。
我觉得确实必须有一种方法来解决这个问题 - 如果 Swarm 可以序列化延续并将它们堵塞在网络上以便在其他地方计算,那么一定可以简单地从预先存在的 Java 类中调用延续。但我不知道如何做到这一点。为了实现这一点,我是否必须用 Scala 重写 netty 的整个部分?
I'm using the Netty library (version 4 from GitHub). It works great in Scala, but I am hoping for my library to be able to use continuation passing style for the asynchronous waiting.
Traditionally with Netty you would do something like this (an example asynchronous connect operation):
//client is a ClientBootstrap
val future:ChannelFuture = client.connect(remoteAddr);
future.addListener(new ChannelFutureListener {
def operationComplete (f:ChannelFuture) = {
//here goes the code that happens when the connection is made
}
})
If you are implementing a library (which I am) then you basically have three simple options to allow the user of the library to do stuff after the connection is made:
- Just return the ChannelFuture from your connect method and let the user deal with it - this doesn't provide much abstraction from netty.
- Take a ChannelFutureListener as a parameter of your connect method and add it as a listener to the ChannelFuture.
- Take a callback function object as a parameter of your connect method and call that from within the ChannelFutureListener that you create (this would make for a callback-driven style somewhat like node.js)
What I am trying to do is a fourth option; I didn't include it in the count above because it is not simple.
I want to use scala delimited continuations to make the use of the library be somewhat like a blocking library, but it will be nonblocking behind the scenes:
class MyLibraryClient {
def connect(remoteAddr:SocketAddress) = {
shift { retrn: (Unit => Unit) => {
val future:ChannelFuture = client.connect(remoteAddr);
future.addListener(new ChannelFutureListener {
def operationComplete(f:ChannelFuture) = {
retrn();
}
});
}
}
}
}
Imagine other read/write operations being implemented in the same fashion. The goal of this being that the user's code can look more like this:
reset {
val conn = new MyLibraryClient();
conn.connect(new InetSocketAddress("127.0.0.1", 1337));
println("This will happen after the connection is finished");
}
In other words, the program will look like a simple blocking-style program but behind the scenes there won't be any blocking or threading.
The trouble I'm running into is that I don't fully understand how the typing of delimited continuations work. When I try to implement it in the above way, the compiler complains that my operationComplete
implementation actually returns Unit @scala.util.continuations.cpsParam[Unit,Unit => Unit]
instead of Unit
. I get that there is sort of a "gotcha" in scala's CPS in that you must annotate a shift
method's return type with @suspendable
, which gets passed up the call stack until the reset
, but there doesn't seem to be any way to reconcile that with a pre-existing Java library that has no concept of delimited continuations.
I feel like there really must be a way around this - if Swarm can serialize continuations and jam them over the network to be computed elsewhere, then it must be possible to simply call a continuation from a pre-existing Java class. But I can't figure out how it can be done. Would I have to rewrite entire parts of netty in Scala in order to make this happen?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
当我开始时,我发现 Scala 的延续 的解释非常有帮助。特别要注意他解释
shift[A, B, C]
和reset[B, C]
的部分。添加虚拟null
作为operationComplete
的最后一个语句应该会有所帮助。顺便说一句,如果另一个
reset
中可能嵌套有shift
,您需要在另一个reset
中调用retrn()
。工作示例:
编辑:这是一个具有可能输出的
I found this explanation of Scala's continuations extremely helpful when I started out. In particular pay attention to the parts where he explains
shift[A, B, C]
andreset[B, C]
. Adding a dummynull
as the last statement ofoperationComplete
should help.Btw, you need to invoke
retrn()
inside anotherreset
if it may have ashift
nested inside it.Edit: Here is a working example
with a possible output: