生成器/块到迭代器/流的转换

发布于 2024-09-24 23:06:54 字数 1096 浏览 1 评论 0原文

基本上我想将其转换为: 转换

def data(block: T => Unit)

为 Stream (dataToStream 是执行此转换的假设函数):

val dataStream: Stream[T] = dataToStream(data)

我想这个问题可以通过延续来解决:

// let's assume that we don't know how data is implemented
// we just know that it generates integers
def data(block: Int => Unit) { for (i <- 0 to 10) block(i) }

// here we can print all data integers
data { i => println(i) }

// >> but what we really want is to convert data to the stream <<

// very dumb solution is to collect all data into a list
var dataList = List[Int]()
data { i => dataList = i::dataList }
// and make a stream from it
dataList.toStream

// but we want to make a lazy, CPU and memory efficient stream or iterator from data
val dataStream: Stream[Int] = dataToStream(data)
dataStream.foreach { i => println(i) }

// and here a black magic of continuations must be used
// for me this magic is too hard to understand
// Does anybody know how dataToStream function could look like?

谢谢,Dawid

Basically I want to convert this:

def data(block: T => Unit)

to a Stream (dataToStream is a hypothetical function that do this conversion):

val dataStream: Stream[T] = dataToStream(data)

I suppose this problem could be resolved by continuations:

// let's assume that we don't know how data is implemented
// we just know that it generates integers
def data(block: Int => Unit) { for (i <- 0 to 10) block(i) }

// here we can print all data integers
data { i => println(i) }

// >> but what we really want is to convert data to the stream <<

// very dumb solution is to collect all data into a list
var dataList = List[Int]()
data { i => dataList = i::dataList }
// and make a stream from it
dataList.toStream

// but we want to make a lazy, CPU and memory efficient stream or iterator from data
val dataStream: Stream[Int] = dataToStream(data)
dataStream.foreach { i => println(i) }

// and here a black magic of continuations must be used
// for me this magic is too hard to understand
// Does anybody know how dataToStream function could look like?

Thanks, Dawid

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

丶视觉 2024-10-01 23:06:54

编辑:修改示例以显示 traversable.view 的惰性。

scala> def data(f : Int => Unit) = for(i <- 1 to 10) {    
     |   println("Generating " + i)
     |   f(i)
     | }
data: (f: (Int) => Unit)Unit

scala> def toTraversable[T]( func : (T => Unit) => Unit) = new Traversable[T] {
     |   def foreach[X]( f : T => X) = func(f(_) : Unit)                       
     | }                                                                       
toTraversable: [T](func: ((T) => Unit) => Unit)java.lang.Object with Traversable[T]

toTraversable 方法会将您的数据函数转换为 Traversable 集合。就其本身而言,它并不大,但您可以将其转换为惰性的 TraversableView。下面是一个例子:

scala> toTraversable(data).view.take(3).sum
Generating 1
Generating 2
Generating 3
Generating 4
res1: Int = 6

take 方法的不幸本质是它必须超过最后生成的值才能正常工作,但它会提前终止。如果没有“.view”调用,上面的代码看起来是一样的。然而,这里有一个更引人注目的例子:

scala> toTraversable(data).view.take(2).foreach(println)
Generating 1
1
Generating 2
2
Generating 3

所以总而言之,我相信您正在寻找的集合是 TraversableView,它最容易创建视图,制作 Traversable 然后在其上调用“view”。如果您确实想要 Stream 类型,这里有一个在 2.8.0.final 中工作的方法,它将创建一个没有线程的“Stream”:

scala> def dataToStream( data : (Int => Unit) => Unit) = {
     |   val x = new Traversable[Int] {                     
     |     def foreach[U](f : Int => U) = {                 
     |        data( f(_) : Unit)                            
     |     }
     |   }
     |   x.view.toList.toStream                             
     | }
dataToStream: (data: ((Int) => Unit) => Unit)scala.collection.immutable.Stream[Int]

scala> dataToStream(data)
res8: scala.collection.immutable.Stream[Int] = Stream(0, ?)

该方法的不幸本质是,它将在创建流之前迭代整个可遍历对象。这也意味着所有值都需要缓冲在内存中。唯一的选择是诉诸线程。

顺便说一句:这是更喜欢 Traversables 作为从 scalax.io.File 方法直接返回的动机:“lines”、“chars”和“bytes”。

EDITED: Modified the examples to show the laziness of traversable.view

scala> def data(f : Int => Unit) = for(i <- 1 to 10) {    
     |   println("Generating " + i)
     |   f(i)
     | }
data: (f: (Int) => Unit)Unit

scala> def toTraversable[T]( func : (T => Unit) => Unit) = new Traversable[T] {
     |   def foreach[X]( f : T => X) = func(f(_) : Unit)                       
     | }                                                                       
toTraversable: [T](func: ((T) => Unit) => Unit)java.lang.Object with Traversable[T]

The toTraversable method will convert your data function into a Traversable collection. By itself, it's nothing huge, but you can convert this to a TraversableView which is lazy. Here's an example:

scala> toTraversable(data).view.take(3).sum
Generating 1
Generating 2
Generating 3
Generating 4
res1: Int = 6

The unfortunate nature of the take method is that it must go one past the last value generated to work correctly, but it will terminate early. The above code would look the same without the ".view" call. However, here's a more compelling example:

scala> toTraversable(data).view.take(2).foreach(println)
Generating 1
1
Generating 2
2
Generating 3

So in conclusion, I believe the collection you're looking for is TraversableView, which is easiest to create view making a Traversable and then calling "view" on it. If you really wanted the Stream type, here's a method that works in 2.8.0.final and will make a "Stream" without threads:

scala> def dataToStream( data : (Int => Unit) => Unit) = {
     |   val x = new Traversable[Int] {                     
     |     def foreach[U](f : Int => U) = {                 
     |        data( f(_) : Unit)                            
     |     }
     |   }
     |   x.view.toList.toStream                             
     | }
dataToStream: (data: ((Int) => Unit) => Unit)scala.collection.immutable.Stream[Int]

scala> dataToStream(data)
res8: scala.collection.immutable.Stream[Int] = Stream(0, ?)

The unfortunate nature of this method is that it will iterate over the entire traversable before making the stream. This also means all the values need to be buffered in memory. The only alternative is to resort to threads.

As an aside: This was the motivating reason to prefer Traversables as direct returns from scalax.io.File methods: "lines" "chars" and "bytes".

风流物 2024-10-01 23:06:54

这是一个简单的解决方案,它生成一个消耗数据的线程。它将数据发布到 SynchronousQueue。创建并返回从队列中提取数据的流:

 def generatortostream[T](f: (T=>Unit)=>Unit): Stream[T] = {
  val queue = new java.util.concurrent.SynchronousQueue[Option[T]]
  val callbackthread = new Runnable {
    def run() { f((Some(_:T)) andThen (queue.put(_))); queue.put(None) }
  }   
  new Thread(callbackthread).start()
  Stream.continually(queue.take).takeWhile(_.isDefined).map(_.get)
}   

Here's a simple solution that spawns a thread that consumes the data. It posts the data to a SynchronousQueue. A stream the pulls data from the queue is created and returned:

 def generatortostream[T](f: (T=>Unit)=>Unit): Stream[T] = {
  val queue = new java.util.concurrent.SynchronousQueue[Option[T]]
  val callbackthread = new Runnable {
    def run() { f((Some(_:T)) andThen (queue.put(_))); queue.put(None) }
  }   
  new Thread(callbackthread).start()
  Stream.continually(queue.take).takeWhile(_.isDefined).map(_.get)
}   
等待我真够勒 2024-10-01 23:06:54

我仍然需要自己弄清楚如何做到这一点。我怀疑答案就在这里:

编辑:删除了显示如何解决不同问题的代码。

编辑2:使用代码http最初发布的://gist.github.com/580157 http://gist.github.com /574873,你可以这样做:

object Main {
  import Generator._

  def data = generator[Int] { yld =>
    for (i <- suspendable(List.range(0, 11))) yld(i)
  }

  def main(args: Array[String]) {
    for( i <- data.toStream ) println(i)
  }
}

data不接受块代码,但我认为这很好,因为通过延续,块可以由调用者处理。 Generator的代码可以在github上的gist中看到。

I still have to figure out how to do that myself. I suspect the answer lies somewhere here:

Edit: removed code that showed how to solved a different problem.

Edit2: Using the code http://gist.github.com/580157 that was initially posted http://gist.github.com/574873, you can do this:

object Main {
  import Generator._

  def data = generator[Int] { yld =>
    for (i <- suspendable(List.range(0, 11))) yld(i)
  }

  def main(args: Array[String]) {
    for( i <- data.toStream ) println(i)
  }
}

data does not take a block code, but I think this is fine because with the continuation, block can be handled by the caller. The code for Generator can be seen in the gist on github.

情释 2024-10-01 23:06:54

这是一个基于分隔连续的实现,改编自 @Geoff Reedy 的产品:

import Stream._
import scala.util.continuations._
import java.util.concurrent.SynchronousQueue

def toStream[A](data: (A=>Unit)=>Unit):Stream[A] = reset {
    val queue = new SynchronousQueue[Option[A]]
    queue.put(Some(shift { k: (A=>Unit) =>
        new Thread() { 
            override def run() {
                data(k)
                // when (if) the data source stops pumping, add None 
                // to signal that the stream is dead
                queue.put(None)
            }
        }.start()
        continually(queue.take).takeWhile(_.isDefined).map(_.get)
    })
}

Here's a delimited continuations-based implementation, adapted from @Geoff Reedy's offering:

import Stream._
import scala.util.continuations._
import java.util.concurrent.SynchronousQueue

def toStream[A](data: (A=>Unit)=>Unit):Stream[A] = reset {
    val queue = new SynchronousQueue[Option[A]]
    queue.put(Some(shift { k: (A=>Unit) =>
        new Thread() { 
            override def run() {
                data(k)
                // when (if) the data source stops pumping, add None 
                // to signal that the stream is dead
                queue.put(None)
            }
        }.start()
        continually(queue.take).takeWhile(_.isDefined).map(_.get)
    })
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文