没有 OutOfMemory 错误的 Scala 流的功能处理

发布于 2024-10-01 11:52:12 字数 821 浏览 1 评论 0原文

是否可以将函数式编程应用于 Scala 流,以便按顺序处理流,但可以对流中已处理的部分进行垃圾收集?

例如,我定义一个包含从 startend 的数字的 Stream

def fromToStream(start: Int, end: Int) : Stream[Int] = {
  if (end < start) Stream.empty
  else start #:: fromToStream(start+1, end)
}

如果我以函数式风格对这些值求和:

println(fromToStream(1,10000000).reduceLeft(_+_))

我得到OutOfMemoryError - 可能是因为调用 reduceLeft 的堆栈帧保存了对流头部的引用。但如果我以迭代方式执行此操作,它会起作用:

var sum = 0
for (i <- fromToStream(1,10000000)) {
  sum += i
}

有没有一种方法可以以函数式方式执行此操作而不会出现 OutOfMemory

更新:这是 scala 中的一个错误现在已修复。所以现在这或多或少已经过时了。

Is it possible to apply functional programming to Scala streams such that the stream is processed sequentially, but the already processed part of the stream can be garbage collected?

For example, I define a Stream that contains the numbers from start to end:

def fromToStream(start: Int, end: Int) : Stream[Int] = {
  if (end < start) Stream.empty
  else start #:: fromToStream(start+1, end)
}

If I sum up the values in a functional style:

println(fromToStream(1,10000000).reduceLeft(_+_))

I get an OutOfMemoryError - perhaps since the stackframe of the call to reduceLeft holds a reference to the head of the stream. But if I do this in iterative style, it works:

var sum = 0
for (i <- fromToStream(1,10000000)) {
  sum += i
}

Is there a way to do this in a functional style without getting an OutOfMemory?

UPDATE: This was a bug in scala that is fixed now. So this is more or less out of date now.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

北笙凉宸 2024-10-08 11:52:12

当我开始学习 Stream 时,我觉得这很酷。然后我意识到 Iterator 是我几乎一直想使用的东西。

如果您确实需要 Stream 但想让 reduceLeft 工作:

fromToStream(1,10000000).toIterator.reduceLeft(_ + _)

如果您尝试上面的行,它会很好地进行垃圾收集。我发现使用 Stream 很棘手,因为很容易在没有意识到的情况下抓住头。有时标准库会以非常微妙的方式为您保留它。

When I started learning about Stream I thought this was cool. Then I realized Iterator is what I want to use nearly all the time.

In case you do need Stream but want to make reduceLeft work:

fromToStream(1,10000000).toIterator.reduceLeft(_ + _)

If you try the line above, it will garbage collect just fine. I have found that using Stream is tricky as it's easy to hold on to the head without realizing it. Sometimes the standard lib will hold on to it for you - in very subtle ways.

你不是我要的菜∠ 2024-10-08 11:52:12

是的,你可以。技巧是使用尾递归方法,以便本地堆栈帧包含对 Stream 实例的唯一引用。由于该方法是尾递归的,一旦递归调用自身,对前一个 Stream 头的本地引用就会被删除,从而使 GC 能够收集到 Stream 的开头随你走。

Welcome to Scala version 2.9.0.r23459-b20101108091606 (Java HotSpot(TM) Server VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.

scala> import collection.immutable.Stream
import collection.immutable.Stream

scala> import annotation.tailrec
import annotation.tailrec

scala> @tailrec def last(s: Stream[Int]): Int = if (s.tail.isEmpty) s.head else last(s.tail)
last: (s: scala.collection.immutable.Stream[Int])Int

scala> last(Stream.range(0, 100000000))                                                                             
res2: Int = 99999999

另外,您必须确保传递给上面方法 last 的内容在堆栈上只有一个引用。如果将 Stream 存储到局部变量或值中,则当您调用 last 方法时,它不会被垃圾回收,因为它的参数不是留给 的唯一引用。代码>流。下面的代码耗尽了内存。

scala> val s = Stream.range(0, 100000000)                                                                           
s: scala.collection.immutable.Stream[Int] = Stream(0, ?)                                                            

scala> last(s)                                                                                                      
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space                                              
        at sun.net.www.ParseUtil.encodePath(ParseUtil.java:84)                                                      
        at sun.misc.URLClassPath$JarLoader.checkResource(URLClassPath.java:674)                                     
        at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:759)                                       
        at sun.misc.URLClassPath.getResource(URLClassPath.java:169)                                                 
        at java.net.URLClassLoader$1.run(URLClassLoader.java:194)                                                   
        at java.security.AccessController.doPrivileged(Native Method)                                               
        at java.net.URLClassLoader.findClass(URLClassLoader.java:190)                                               
        at java.lang.ClassLoader.loadClass(ClassLoader.java:307)                                                    
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)                                            
        at java.lang.ClassLoader.loadClass(ClassLoader.java:248)                                                    
        at scala.tools.nsc.Interpreter$Request$anonfun$onErr$1$1.apply(Interpreter.scala:978)                      
        at scala.tools.nsc.Interpreter$Request$anonfun$onErr$1$1.apply(Interpreter.scala:976)                      
        at scala.util.control.Exception$Catch.apply(Exception.scala:80)
        at scala.tools.nsc.Interpreter$Request.loadAndRun(Interpreter.scala:984)                                    
        at scala.tools.nsc.Interpreter.loadAndRunReq$1(Interpreter.scala:579)                                       
        at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:599)                                             
        at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:576)
        at scala.tools.nsc.InterpreterLoop.reallyInterpret$1(InterpreterLoop.scala:472)                             
        at scala.tools.nsc.InterpreterLoop.interpretStartingWith(InterpreterLoop.scala:515)                         
        at scala.tools.nsc.InterpreterLoop.command(InterpreterLoop.scala:362)
        at scala.tools.nsc.InterpreterLoop.processLine$1(InterpreterLoop.scala:243)
        at scala.tools.nsc.InterpreterLoop.repl(InterpreterLoop.scala:249)
        at scala.tools.nsc.InterpreterLoop.main(InterpreterLoop.scala:559)
        at scala.tools.nsc.MainGenericRunner$.process(MainGenericRunner.scala:75)
        at scala.tools.nsc.MainGenericRunner$.main(MainGenericRunner.scala:31)
        at scala.tools.nsc.MainGenericRunner.main(MainGenericRunner.scala)

总结一下:

  1. 使用尾递归方法
  2. 将它们注释为尾递归
  3. 当您调用它们时,请确保它们的参数是对 Stream 的唯一引用

编辑:

请注意,这也有效,并且不会导致内存不足错误:

scala> def s = Stream.range(0, 100000000)                                                   
s: scala.collection.immutable.Stream[Int]

scala> last(s)                                                                              
res1: Int = 99999999

EDIT2:

并且在您需要的reduceLeft的情况下,您必须为结果定义一个带有累加器参数的辅助方法。

对于reduceLeft,您需要一个累加器参数,您可以使用默认参数将其设置为某个值。一个简化的例子:

scala> @tailrec def rcl(s: Stream[Int], acc: Int = 0): Int = if (s.isEmpty) acc else rcl(s.tail, acc + s.head)
rcl: (s: scala.collection.immutable.Stream[Int],acc: Int)Int

scala> rcl(Stream.range(0, 10000000))
res6: Int = -2014260032

Yes, you can. The trick is to use tail recursive methods, so that the local stack frame contains the only reference to the Stream instance. Since the method is tail-recursive, the local reference to the previous Stream head will be erased once it recursively calls itself, thus enabling the GC to collect the start of the Stream as you go.

Welcome to Scala version 2.9.0.r23459-b20101108091606 (Java HotSpot(TM) Server VM, Java 1.6.0_20).
Type in expressions to have them evaluated.
Type :help for more information.

scala> import collection.immutable.Stream
import collection.immutable.Stream

scala> import annotation.tailrec
import annotation.tailrec

scala> @tailrec def last(s: Stream[Int]): Int = if (s.tail.isEmpty) s.head else last(s.tail)
last: (s: scala.collection.immutable.Stream[Int])Int

scala> last(Stream.range(0, 100000000))                                                                             
res2: Int = 99999999

Also, you must ensure that the thing you pass to the method last above has only one reference on the stack. If you store a Stream into a local variable or value, it will not be garbage collected when you call the last method, since its argument is not the only reference left to Stream. The code below runs out of memory.

scala> val s = Stream.range(0, 100000000)                                                                           
s: scala.collection.immutable.Stream[Int] = Stream(0, ?)                                                            

scala> last(s)                                                                                                      
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space                                              
        at sun.net.www.ParseUtil.encodePath(ParseUtil.java:84)                                                      
        at sun.misc.URLClassPath$JarLoader.checkResource(URLClassPath.java:674)                                     
        at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:759)                                       
        at sun.misc.URLClassPath.getResource(URLClassPath.java:169)                                                 
        at java.net.URLClassLoader$1.run(URLClassLoader.java:194)                                                   
        at java.security.AccessController.doPrivileged(Native Method)                                               
        at java.net.URLClassLoader.findClass(URLClassLoader.java:190)                                               
        at java.lang.ClassLoader.loadClass(ClassLoader.java:307)                                                    
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)                                            
        at java.lang.ClassLoader.loadClass(ClassLoader.java:248)                                                    
        at scala.tools.nsc.Interpreter$Request$anonfun$onErr$1$1.apply(Interpreter.scala:978)                      
        at scala.tools.nsc.Interpreter$Request$anonfun$onErr$1$1.apply(Interpreter.scala:976)                      
        at scala.util.control.Exception$Catch.apply(Exception.scala:80)
        at scala.tools.nsc.Interpreter$Request.loadAndRun(Interpreter.scala:984)                                    
        at scala.tools.nsc.Interpreter.loadAndRunReq$1(Interpreter.scala:579)                                       
        at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:599)                                             
        at scala.tools.nsc.Interpreter.interpret(Interpreter.scala:576)
        at scala.tools.nsc.InterpreterLoop.reallyInterpret$1(InterpreterLoop.scala:472)                             
        at scala.tools.nsc.InterpreterLoop.interpretStartingWith(InterpreterLoop.scala:515)                         
        at scala.tools.nsc.InterpreterLoop.command(InterpreterLoop.scala:362)
        at scala.tools.nsc.InterpreterLoop.processLine$1(InterpreterLoop.scala:243)
        at scala.tools.nsc.InterpreterLoop.repl(InterpreterLoop.scala:249)
        at scala.tools.nsc.InterpreterLoop.main(InterpreterLoop.scala:559)
        at scala.tools.nsc.MainGenericRunner$.process(MainGenericRunner.scala:75)
        at scala.tools.nsc.MainGenericRunner$.main(MainGenericRunner.scala:31)
        at scala.tools.nsc.MainGenericRunner.main(MainGenericRunner.scala)

To summarize:

  1. Use tail-recursive methods
  2. Annotate them as tail-recursive
  3. When you call them, ensure that their argument is the only reference to the Stream

EDIT:

Note that this also works and does not result in an out of memory error:

scala> def s = Stream.range(0, 100000000)                                                   
s: scala.collection.immutable.Stream[Int]

scala> last(s)                                                                              
res1: Int = 99999999

EDIT2:

And in the case of reduceLeft that you require, you would have to define a helper method with an accumulator argument for the result.

For reduceLeft, you need an accumulator argument, which you can set to a certain value using default arguments. A simplified example:

scala> @tailrec def rcl(s: Stream[Int], acc: Int = 0): Int = if (s.isEmpty) acc else rcl(s.tail, acc + s.head)
rcl: (s: scala.collection.immutable.Stream[Int],acc: Int)Int

scala> rcl(Stream.range(0, 10000000))
res6: Int = -2014260032
生生漫 2024-10-08 11:52:12

您可能想查看 Scalaz 的 临时流

You may want to look at Scalaz's ephemeral streams.

喜爱纠缠 2024-10-08 11:52:12

事实证明,这是当前实现中的一个错误的reduceLeft。问题是reduceLeft调用foldLeft,因此reduceLeft的栈帧在整个调用过程中保存了对流头部的引用。 FoldLeft 使用尾递归来避免这个问题。比较:

(1 to 10000000).toStream.foldLeft(0)(_+_)
(1 to 10000000).toStream.reduceLeft(_+_)

它们在语义上是等效的。在Scala 2.8.0 版本中,对foldLeft 的调用可以工作,但对reduceLeft 的调用会抛出OutOfMemory。如果reduceLeft 能够完成自己的工作,则不会出现此问题。

As it turns out, this is a bug in the current implementation of reduceLeft. The problem is that reduceLeft calls foldLeft, and thus the stackframe of reduceLeft holds a reference to the head of the stream during the whole call. foldLeft uses tail-recursion to avoid this problem. Compare:

(1 to 10000000).toStream.foldLeft(0)(_+_)
(1 to 10000000).toStream.reduceLeft(_+_)

These are semantically equivalent. In Scala version 2.8.0 the call to foldLeft works, but the call to reduceLeft throws an OutOfMemory. If reduceLeft would do its own work, this problem would not occur.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文