Scala 中有 FIFO 流吗?

发布于 2024-12-06 12:09:34 字数 1862 浏览 0 评论 0原文

我正在 Scala 中寻找 FIFO 流,即提供

流应该是可关闭的,并且应该阻止对下一个元素的访问,直到添加该元素或关闭流。

实际上,我有点惊讶的是集合库没有(似乎)包含这样的数据结构,因为在我看来它是一个非常经典的数据结构。

我的问题:

  • 1) 我是否忽略了什么?是否已经有一个类提供此功能?

  • 2)好的,如果它没有包含在集合库中,那么它可能只是现有集合类的简单组合。然而,我试图找到这个简单的代码,但对于这样一个简单的问题,我的实现看起来仍然相当复杂。对于这样的 FifoStream 是否有更简单的解决方案?

    class FifoStream[T] 扩展 Closeable {
    
    val 队列 = 新队列[选项[T]]
    
    惰性 val 流 = nextStreamElem
    
    private def nextStreamElem: Stream[T] = next() 匹配 {
        案例一些(元素)=> Stream.cons(elem, nextStreamElem)
        情况无=>流.空
    }
    
    /** 返回队列中的下一个元素(可能等待它被插入)。 */
    私有 def next() = {
        队列.synchronized {
            if (queue.isEmpty) 队列.wait()
            队列.出队()
        }
    }
    
    /** 向该流添加新元素。 */
    def enqueue(elems: T*) {
        队列.synchronized {
            队列.enqueue(elems.map{Some(_)}: _*)
            队列.notify()
        }
    }
    
    /** 关闭该流。 */
    def 关闭() {
        队列.synchronized {
            队列.入队(无)
            队列.notify()
        }
    }
    }
    

Paradigmatic的解决方案(稍作修改)

感谢您的建议。我稍微修改了范例的解决方案,以便 toStream 返回一个不可变的流(允许可重复读取),从而满足我的需求。为了完整起见,这里是代码:

import collection.JavaConversions._
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] = new LinkedBlockingQueue[Option[A]]() ) {
  lazy val toStream: Stream[A] = queue2stream
  private def queue2stream: Stream[A] = queue take match {
    case Some(a) => Stream cons ( a, queue2stream )
    case None    => Stream empty
  }
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}

I'm looking for a FIFO stream in Scala, i.e., something that provides the functionality of

  • immutable.Stream (a stream that can be finite and memorizes the elements that have already been read)
  • mutable.Queue (which allows for added elements to the FIFO)

The stream should be closable and should block access to the next element until the element has been added or the stream has been closed.

Actually I'm a bit surprised that the collection library does not (seem to) include such a data structure, since it is IMO a quite classical one.

My questions:

  • 1) Did I overlook something? Is there already a class providing this functionality?

  • 2) OK, if it's not included in the collection library then it might by just a trivial combination of existing collection classes. However, I tried to find this trivial code but my implementation looks still quite complex for such a simple problem. Is there a simpler solution for such a FifoStream?

    class FifoStream[T] extends Closeable {
    
    val queue = new Queue[Option[T]]
    
    lazy val stream = nextStreamElem
    
    private def nextStreamElem: Stream[T] = next() match {
        case Some(elem) => Stream.cons(elem, nextStreamElem)
        case None       => Stream.empty
    }
    
    /** Returns next element in the queue (may wait for it to be inserted). */
    private def next() = {
        queue.synchronized {
            if (queue.isEmpty) queue.wait()
            queue.dequeue()
        }
    }
    
    /** Adds new elements to this stream. */
    def enqueue(elems: T*) {
        queue.synchronized {
            queue.enqueue(elems.map{Some(_)}: _*)
            queue.notify()
        }
    }
    
    /** Closes this stream. */
    def close() {
        queue.synchronized {
            queue.enqueue(None)
            queue.notify()
        }
    }
    }
    

Paradigmatic's solution (sightly modified)

Thanks for your suggestions. I slightly modified paradigmatic's solution so that toStream returns an immutable stream (allows for repeatable reads) so that it fits my needs. Just for completeness, here is the code:

import collection.JavaConversions._
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] = new LinkedBlockingQueue[Option[A]]() ) {
  lazy val toStream: Stream[A] = queue2stream
  private def queue2stream: Stream[A] = queue take match {
    case Some(a) => Stream cons ( a, queue2stream )
    case None    => Stream empty
  }
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

忆伤 2024-12-13 12:09:34

在 Scala 中,流是“函数式迭代器”。人们期望它们是纯粹的(没有副作用)和不可变的。在您的情况下,每次迭代流时都会修改队列(因此​​它不是纯粹的)。这可能会造成很多误解,因为迭代同一流两次会产生两个不同的结果。

话虽这么说,您应该使用 Java BlockingQueues,而不是滚动您自己的实现。它们在安全性和性能方面被认为实施得很好。这是我能想到的最干净的代码(使用你的方法):

import java.util.concurrent.BlockingQueue
import scala.collection.JavaConversions._

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] ) {
  def toStream: Stream[A] = queue take match {
    case Some(a) => Stream cons ( a, toStream )
    case None => Stream empty
  }
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}

object FIFOStream {
  def apply[A]() = new LinkedBlockingQueue
}

In Scala, streams are "functional iterators". People expect them to be pure (no side effects) and immutable. In you case, everytime you iterate on the stream you modify the queue (so it's no pure). This can create a lot of misunderstandings, because iterating twice the same stream, will have two different results.

That being said, you should rather use Java BlockingQueues, rather than rolling your own implementation. They are considered well implemented in term of safety and performances. Here is the cleanest code I can think of (using your approach):

import java.util.concurrent.BlockingQueue
import scala.collection.JavaConversions._

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] ) {
  def toStream: Stream[A] = queue take match {
    case Some(a) => Stream cons ( a, toStream )
    case None => Stream empty
  }
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}

object FIFOStream {
  def apply[A]() = new LinkedBlockingQueue
}
梦醒灬来后我 2024-12-13 12:09:34

我假设您正在寻找类似 的内容java.util.concurrent.BlockingQueue

Akka 有一个该接口的 BoundedBlockingQueue 实现。当然,java 中提供了可用的实现.util.并发

您还可以考虑使用 Akka 的 actors 来完成您正在做的任何事情。使用 Actor 来通知或推送新事件或消息,而不是拉动。

I'm assuming you're looking for something like java.util.concurrent.BlockingQueue?

Akka has a BoundedBlockingQueue implementation of this interface. There are of course the implementations available in java.util.concurrent.

You might also consider using Akka's actors for whatever it is you are doing. Use Actors to be notified or pushed a new event or message instead of pulling.

无所的.畏惧 2024-12-13 12:09:34

1)您似乎正在寻找在 Oz 等语言中看到的数据流,它支持生产者-消费者模式。集合 API 中不提供此类集合,但您始终可以自己创建一个集合。

2)数据流流依赖于single的概念- 赋值变量(这样它们就不必在声明点初始化并在初始化之前读取它们会导致阻塞):

val x: Int
startThread {
  println(x)
}
println("The other thread waits for the x to be assigned")
x = 1

如果单赋值(或数据流),那么实现这样的流将很简单该语言支持变量(请参阅链接)。由于它们不是 Scala 的一部分,因此您必须像以前一样使用 wait-synchronized-notify 模式。

可以使用 Java 并发队列正如其他用户所建议的那样,也可以实现这一目标。

1) It seems you're looking for a dataflow stream seen in languages like Oz, which supports the producer-consumer pattern. Such a collection is not available in the collections API, but you could always create one yourself.

2) The data flow stream relies on the concept of single-assignment variables (such that they don't have to be initialized upon declaration point and reading them prior to initialization causes blocking):

val x: Int
startThread {
  println(x)
}
println("The other thread waits for the x to be assigned")
x = 1

It would be straightforward to implement such a stream if single-assignment (or dataflow) variables were supported in the language (see the link). Since they are not a part of Scala, you have to use the wait-synchronized-notify pattern just like you did.

Concurrent queues from Java can be used to achieve that as well, as the other user suggested.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文