将 scala 延续与 netty/NIO 监听器结合使用

发布于 2025-01-03 19:24:31 字数 2053 浏览 5 评论 0原文

我正在使用 Netty 库(来自 GitHub 的版本 4)。它在 Scala 中工作得很好,但我希望我的库能够使用连续传递样式来进行异步等待。

传统上,使用 Netty,您会执行以下操作(异步连接操作示例):

//client is a ClientBootstrap
val future:ChannelFuture = client.connect(remoteAddr);
future.addListener(new ChannelFutureListener {
    def operationComplete (f:ChannelFuture) = {
        //here goes the code that happens when the connection is made   
    }
})

如果您正在实现一个库(我就是),那么您基本上有三个简单的选项来允许库的用户在连接建立后执行操作:

  1. 只需从 connect 方法返回 ChannelFuture 并让用户处理它 - 这并没有提供来自 netty 的太多抽象。
  2. 将 ChannelFutureListener 作为 connect 方法的参数并将其添加为 ChannelFuture 的侦听器。
  3. 将回调函数对象作为 connect 方法的参数,并从您创建的 ChannelFutureListener 中调用它(这将形成回调驱动的风格,有点像 node.js)。

我想做的是第四个选项;我没有将其包括在上面的计数中,因为它并不简单。

我想使用 scala 分隔延续来使库的使用有点像阻塞库,但它将在幕后非阻塞:

class MyLibraryClient {
    def connect(remoteAddr:SocketAddress) = {
        shift { retrn: (Unit => Unit) => {
                val future:ChannelFuture = client.connect(remoteAddr);
                future.addListener(new ChannelFutureListener {
                    def operationComplete(f:ChannelFuture) = {
                        retrn();
                    }   
                });
            }
        }   
    }
}

想象一下以相同方式实现的其他读/写操作。这样做的目的是让用户的代码看起来更像这样:

reset {
    val conn = new MyLibraryClient();
    conn.connect(new InetSocketAddress("127.0.0.1", 1337));
    println("This will happen after the connection is finished");
}

换句话说,程序看起来像一个简单的阻塞式程序,但在幕后不会有任何阻塞或线程。

我遇到的麻烦是我不完全理解分隔延续的类型是如何工作的。当我尝试以上述方式实现它时,编译器抱怨我的 operationComplete 实现实际上返回 Unit @scala.util.continuations.cpsParam[Unit,Unit =>单位] 而不是单位。我发现 scala 的 CPS 中存在某种“陷阱”,因为您必须使用 @suspendable 注释 shift 方法的返回类型,该类型会在调用堆栈中向上传递,直到reset,但似乎没有任何方法可以将其与没有分隔延续概念的现有 Java 库相协调。

我觉得确实必须有一种方法来解决这个问题 - 如果 Swarm 可以序列化延续并将它们堵塞在网络上以便在其他地方计算,那么一定可以简单地从预先存在的 Java 类中调用延续。但我不知道如何做到这一点。为了实现这一点,我是否必须用 Scala 重写 netty 的整个部分?

I'm using the Netty library (version 4 from GitHub). It works great in Scala, but I am hoping for my library to be able to use continuation passing style for the asynchronous waiting.

Traditionally with Netty you would do something like this (an example asynchronous connect operation):

//client is a ClientBootstrap
val future:ChannelFuture = client.connect(remoteAddr);
future.addListener(new ChannelFutureListener {
    def operationComplete (f:ChannelFuture) = {
        //here goes the code that happens when the connection is made   
    }
})

If you are implementing a library (which I am) then you basically have three simple options to allow the user of the library to do stuff after the connection is made:

  1. Just return the ChannelFuture from your connect method and let the user deal with it - this doesn't provide much abstraction from netty.
  2. Take a ChannelFutureListener as a parameter of your connect method and add it as a listener to the ChannelFuture.
  3. Take a callback function object as a parameter of your connect method and call that from within the ChannelFutureListener that you create (this would make for a callback-driven style somewhat like node.js)

What I am trying to do is a fourth option; I didn't include it in the count above because it is not simple.

I want to use scala delimited continuations to make the use of the library be somewhat like a blocking library, but it will be nonblocking behind the scenes:

class MyLibraryClient {
    def connect(remoteAddr:SocketAddress) = {
        shift { retrn: (Unit => Unit) => {
                val future:ChannelFuture = client.connect(remoteAddr);
                future.addListener(new ChannelFutureListener {
                    def operationComplete(f:ChannelFuture) = {
                        retrn();
                    }   
                });
            }
        }   
    }
}

Imagine other read/write operations being implemented in the same fashion. The goal of this being that the user's code can look more like this:

reset {
    val conn = new MyLibraryClient();
    conn.connect(new InetSocketAddress("127.0.0.1", 1337));
    println("This will happen after the connection is finished");
}

In other words, the program will look like a simple blocking-style program but behind the scenes there won't be any blocking or threading.

The trouble I'm running into is that I don't fully understand how the typing of delimited continuations work. When I try to implement it in the above way, the compiler complains that my operationComplete implementation actually returns Unit @scala.util.continuations.cpsParam[Unit,Unit => Unit] instead of Unit. I get that there is sort of a "gotcha" in scala's CPS in that you must annotate a shift method's return type with @suspendable, which gets passed up the call stack until the reset, but there doesn't seem to be any way to reconcile that with a pre-existing Java library that has no concept of delimited continuations.

I feel like there really must be a way around this - if Swarm can serialize continuations and jam them over the network to be computed elsewhere, then it must be possible to simply call a continuation from a pre-existing Java class. But I can't figure out how it can be done. Would I have to rewrite entire parts of netty in Scala in order to make this happen?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

辞取 2025-01-10 19:24:31

当我开始时,我发现 Scala 的延续 的解释非常有帮助。特别要注意他解释 shift[A, B, C]reset[B, C] 的部分。添加虚拟 null 作为 operationComplete 的最后一个语句应该会有所帮助。

顺便说一句,如果另一个 reset 中可能嵌套有 shift ,您需要在另一个 reset 中调用 retrn()

工作示例:

import scala.util.continuations._
import java.util.concurrent.Executors

object Test {

  val execService = Executors.newFixedThreadPool(2)

  def main(args: Array[String]): Unit = {
    reset {
      val conn = new MyLibraryClient();
      conn.connect("127.0.0.1");
      println("This will happen after the connection is finished");
    }
    println("Outside reset");
  }
}

class ChannelFuture {
  def addListener(listener: ChannelFutureListener): Unit = {
    val future = this
    Test.execService.submit(new Runnable {
      def run(): Unit = {
        listener.operationComplete(future)
      }
    })
  }
}

trait ChannelFutureListener {
  def operationComplete(f: ChannelFuture): Unit
}

class MyLibraryClient {
  def connect(remoteAddr: String): Unit@cps[Unit] = {
    shift {
      retrn: (Unit => Unit) => {
        val future: ChannelFuture = new ChannelFuture()
        future.addListener(new ChannelFutureListener {
          def operationComplete(f: ChannelFuture): Unit = {
            println("operationComplete starts")
            retrn();
            null
          }
        });
      }
    }
  }
}

编辑:这是一个具有可能输出的

Outside reset
operationComplete starts
This will happen after the connection is finished

I found this explanation of Scala's continuations extremely helpful when I started out. In particular pay attention to the parts where he explains shift[A, B, C] and reset[B, C]. Adding a dummy null as the last statement of operationComplete should help.

Btw, you need to invoke retrn() inside another reset if it may have a shift nested inside it.

Edit: Here is a working example

import scala.util.continuations._
import java.util.concurrent.Executors

object Test {

  val execService = Executors.newFixedThreadPool(2)

  def main(args: Array[String]): Unit = {
    reset {
      val conn = new MyLibraryClient();
      conn.connect("127.0.0.1");
      println("This will happen after the connection is finished");
    }
    println("Outside reset");
  }
}

class ChannelFuture {
  def addListener(listener: ChannelFutureListener): Unit = {
    val future = this
    Test.execService.submit(new Runnable {
      def run(): Unit = {
        listener.operationComplete(future)
      }
    })
  }
}

trait ChannelFutureListener {
  def operationComplete(f: ChannelFuture): Unit
}

class MyLibraryClient {
  def connect(remoteAddr: String): Unit@cps[Unit] = {
    shift {
      retrn: (Unit => Unit) => {
        val future: ChannelFuture = new ChannelFuture()
        future.addListener(new ChannelFutureListener {
          def operationComplete(f: ChannelFuture): Unit = {
            println("operationComplete starts")
            retrn();
            null
          }
        });
      }
    }
  }
}

with a possible output:

Outside reset
operationComplete starts
This will happen after the connection is finished
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文