如何在Scala中复制迭代器?

发布于 2024-12-10 13:06:22 字数 1005 浏览 1 评论 0原文

关于重复

这不是如何克隆的重复迭代器?

请不要盲目地关闭这个问题,所有所谓重复的答案都不起作用。 OP 负责另一个问题,显然,答案适合他的问题,但不适合我的问题。

并不是每个类似的问题都是重复的,SE上有“扩展问题”这样的功能,唯一的方法是就同一主题再次询问以获得不同的、有效的答案。

问题

我有迭代器。我想获得它的副本(副本),这样我就可以完全独立地处理原件和副本。

重要

通过反射或序列化进行复制是不行的(性能损失)。

示例

var list = List(1,2,3,4,5)
var it1 = list.iterator
it1.next()

var it2 = it1   // (*)
it2.next()

println(it1.next())

这将简单地引用it1,因此当更改it1时,it2也会更改,反之亦然。

上面的例子使用List,我目前正在努力使用HashMap,但问题是一般性的——只是迭代器。

方法#1

如果你编辑行(*)并写入:(

var it2 = it1.toList.iterator

这是建议作为链接问题中的解决方案)执行程序时抛出异常。

方法#2

“你拿着清单然后......”。不,我不。我没有列表,我有迭代器。一般来说,我对迭代器背后的集合一无所知,我唯一拥有的就是迭代器。我必须“分叉”它。

About duplicate

This is NOT a duplicate of How to clone an iterator?

Please do not blindly close this question, all the answers given in so-called duplicate DO NOT work. The OP is in charge of the other problem, and obviously, the answers fitted HIS problem, but not mine.

Not every similar question is a duplicate, there is such feature as "expansion question" on SE, the only way is to ask again on the same subject to get different, working, answers.

Problem

I have iterator. I would like to get copy (duplicate) of it, so then I could proceed with original and copy completely independently.

Important

Copying through reflection or serialization is no-go (performance penalty).

Example

var list = List(1,2,3,4,5)
var it1 = list.iterator
it1.next()

var it2 = it1   // (*)
it2.next()

println(it1.next())

This would make simply reference to it1, so when changing it1, it2 changes as well and vice-versa.

The example above uses List, I am currently struggling with HashMap, but the question is general one -- just iterator.

Approach #1

If you edit line (*) and write:

var it2 = it1.toList.iterator

(this was suggested as solution in the linked question) the exception is thrown while executing the program.

Approach #2

"You take the list and...". No, I don't. I don't have a list, I have iterator. In general I don't know anything about collection which underlies the iterator, the only thing I have is iterator. I have to "fork" it.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

橘味果▽酱 2024-12-17 13:06:22

您无法在不破坏迭代器的情况下复制它。迭代器的约定是它只能被遍历一次。

您链接到的问题显示了如何用您已销毁的一份来换取两份副本。您不能继续使用原始版本,但现在可以独立运行两个新副本。

You can't duplicate an iterator without destroying it. The contract for iterator is that it can only be traversed once.

The question you linked to shows how to get two copies in exchange for the one you've destroyed. You cannot keep using the original, but you can now run the two new copies forward independently.

权谋诡计 2024-12-17 13:06:22

创建一个 List 迭代器非常容易,您可以复制它而不破坏它:这基本上是从 List 复制的 iterator 方法的定义添加了 fork 方法的源代码:

class ForkableIterator[A] (list: List[A]) extends Iterator[A] {
    var these = list
    def hasNext: Boolean = !these.isEmpty
    def next: A = 
      if (hasNext) {
        val result = these.head; these = these.tail; result
      } else Iterator.empty.next
    def fork = new ForkableIterator(these)
}

使用:

scala> val it = new ForkableIterator(List(1,2,3,4,5,6))
it: ForkableIterator[Int] = non-empty iterator

scala> it.next
res72: Int = 1

scala> val it2 = it.fork
it2: ForkableIterator[Int] = non-empty iterator

scala> it2.next
res73: Int = 2

scala> it2.next
res74: Int = 3

scala> it.next
res75: Int = 2

我看过为 HashMap 执行此操作,但它似乎更复杂(部分原因是根据集合大小有不同的映射实现) 。因此,最好在 yourMap.toList 上使用上述实现。

It's pretty easy to create a List iterator that you can duplicate without destroying it: this is basically the definition of the iterator method copied from the List source with a fork method added:

class ForkableIterator[A] (list: List[A]) extends Iterator[A] {
    var these = list
    def hasNext: Boolean = !these.isEmpty
    def next: A = 
      if (hasNext) {
        val result = these.head; these = these.tail; result
      } else Iterator.empty.next
    def fork = new ForkableIterator(these)
}

Use:

scala> val it = new ForkableIterator(List(1,2,3,4,5,6))
it: ForkableIterator[Int] = non-empty iterator

scala> it.next
res72: Int = 1

scala> val it2 = it.fork
it2: ForkableIterator[Int] = non-empty iterator

scala> it2.next
res73: Int = 2

scala> it2.next
res74: Int = 3

scala> it.next
res75: Int = 2

I had a look at doing this for HashMap but it seems more complicated (partly because there are different map implementations depending on collection size). So probably best to use the above implementation on yourMap.toList.

默嘫て 2024-12-17 13:06:22

正如雷克斯所说,不可能在不破坏迭代器的情况下复制它。也就是说,duplicate 有什么问题吗?

var list = List(1,2,3,4,5)
var it1 = list.iterator
it1.next()

val (it1a, it1b) = it1.duplicate
it1 = it1a
var it2 = it1b
it2.next()

println(it1.next())

As Rex said, it is impossible to make a copy of an Iterator without destroying it. That said, what is the problem with duplicate?

var list = List(1,2,3,4,5)
var it1 = list.iterator
it1.next()

val (it1a, it1b) = it1.duplicate
it1 = it1a
var it2 = it1b
it2.next()

println(it1.next())
故人如初 2024-12-17 13:06:22

我认为这是一个很好的问题,遗憾的是很多人不理解这个问题的价值。在大数据时代,很多情况下我们有一个流,而不是无法收集或装入内存的已分配数据列表。从一开始就重复它的成本也很高。如果我们需要对数据进行两次(或多次)单独计算,我们该怎么办?例如,我们可能需要使用已经编写的函数来计算最小值、最大值、总和、md5 等,并且在不同线程中仅传递一次。

一般的解决方案是使用 Akka-Stream。这样就可以了。
但是 Iterator 是否可能,这是 Java/Scala 中表示此类流数据源的最简单方法?
答案是肯定的,尽管我们“无法完全独立地进行原始和复制”,这意味着我们必须同步每个消费者线程的消耗速度。 (Akka-Stream 使用背压和一些中间缓冲区来实现这一点)。

所以这是我的简单解决方案:使用 Phaser。有了它,我们可以对一次性源进行迭代器包装。该对象将在每个消费者线程中用作简单的迭代器。使用它你可以提前知道消耗线程的数量。此外,每个消费者线程必须耗尽源直到最后,以避免所有的挂起(例如使用flush()方法)。

import java.util.concurrent.Phaser
import java.util.concurrent.atomic.AtomicBoolean

// it0 - input source iterator
// num - exact number of consuming threads. We have to know it in advance.
case class ForkableIterator[+A]( it0: Iterator[A], num: Int ) extends Phaser(num) with Iterator[A] {

  val it = it0.flatMap( Stream.fill(num)(_) )  // serial replicator

  private var hasNext0 = new AtomicBoolean( it0.hasNext )
  override def hasNext: Boolean = hasNext0.get()

  override def next(): A = {
    arriveAndAwaitAdvance()
    val next = it.synchronized {
      val next = it.next()
      if (hasNext0.get) hasNext0.set(it.hasNext)
      next
    }
    arriveAndAwaitAdvance() // otherwise the tasks locks at the end the last data element
    next
  }

  // In case that a consumer gives up to read before the end of its source data stream occurs
  // it HAVE to drain the last to avoid block others. (Note: Phaser has no "unregister" method?).
  // Calling it may be avoided if all consumers read exactly the same amount of data,
  // e.g. until the very end of it.
  def flush(): Unit = while (hasNext) next()
}

PS 这个“ForkableIterator”被我成功地与 Spark 一起使用,对长源数据流执行多个独立的聚合。在这种情况下,我不需要手动创建线程。您还可以使用 Scala Futures / Monix Tasks 等。

PSPS 我现在重新检查 JDK Phaser 规范,发现它实际上有一个名为reachAndDeregister()的“取消注册”方法。因此,如果消费者完成,请使用它而不是flush()。

I think this is a very good question, it's a pity that many one doesn't understood the value of the problem. In the age of Big Data there are a lot of situation that we have a stream, not an allocated list of the data that cannot be collected or fit into memory. And the repeating of it from the very begin is costly too. What we can do if we need two (or more) separate calculation with the data? For example we may need to calculate min, max, sum, md5 etc using already written functions with only one pass through in the different threads.

The general solution is to use Akka-Stream. This will do it.
But is it possible with Iterator, that is the easiest way in Java/Scala to represent such streaming data source?
The answer is yes, although we "could NOT proceed with original and copy completely independently" in meaning that we have to synchronize the speeds of consumption of each consumer thread. (Akka-Stream do this leveraging using back-pressure and some intermediate buffers).

So here is my easy solution: to use Phaser. With it we can make Iterator wrapper over one-pass source. This object are to use in each consumer thread as simple Iterator. Using it you are to know the number of consuming threads in advance. Also each consumer-thread MUST drain the source until the end to avoid the hang of all overs (using flush() method for example).

import java.util.concurrent.Phaser
import java.util.concurrent.atomic.AtomicBoolean

// it0 - input source iterator
// num - exact number of consuming threads. We have to know it in advance.
case class ForkableIterator[+A]( it0: Iterator[A], num: Int ) extends Phaser(num) with Iterator[A] {

  val it = it0.flatMap( Stream.fill(num)(_) )  // serial replicator

  private var hasNext0 = new AtomicBoolean( it0.hasNext )
  override def hasNext: Boolean = hasNext0.get()

  override def next(): A = {
    arriveAndAwaitAdvance()
    val next = it.synchronized {
      val next = it.next()
      if (hasNext0.get) hasNext0.set(it.hasNext)
      next
    }
    arriveAndAwaitAdvance() // otherwise the tasks locks at the end the last data element
    next
  }

  // In case that a consumer gives up to read before the end of its source data stream occurs
  // it HAVE to drain the last to avoid block others. (Note: Phaser has no "unregister" method?).
  // Calling it may be avoided if all consumers read exactly the same amount of data,
  // e.g. until the very end of it.
  def flush(): Unit = while (hasNext) next()
}

PS This "ForkableIterator" was successfully used by me with Spark to perform several independent aggregations over long stream of source data. In such case I have no bother about creating threads manually. You may also use Scala Futures / Monix Tasks etc.

PSPS I recheck the JDK Phaser specification now and find that It actually has "unregister" method called arriveAndDeregister(). So use it instead of flush() if a consumer complete.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文