Scala 中有 FIFO 流吗?
我正在 Scala 中寻找 FIFO 流,即提供
- immutable.Stream(可以是有限的流,并记住已经读取的元素)
- mutable.Queue (允许向 FIFO 添加元素)
流应该是可关闭的,并且应该阻止对下一个元素的访问,直到添加该元素或关闭流。
实际上,我有点惊讶的是集合库没有(似乎)包含这样的数据结构,因为在我看来它是一个非常经典的数据结构。
我的问题:
1) 我是否忽略了什么?是否已经有一个类提供此功能?
2)好的,如果它没有包含在集合库中,那么它可能只是现有集合类的简单组合。然而,我试图找到这个简单的代码,但对于这样一个简单的问题,我的实现看起来仍然相当复杂。对于这样的 FifoStream 是否有更简单的解决方案?
class FifoStream[T] 扩展 Closeable { val 队列 = 新队列[选项[T]] 惰性 val 流 = nextStreamElem private def nextStreamElem: Stream[T] = next() 匹配 { 案例一些(元素)=> Stream.cons(elem, nextStreamElem) 情况无=>流.空 } /** 返回队列中的下一个元素(可能等待它被插入)。 */ 私有 def next() = { 队列.synchronized { if (queue.isEmpty) 队列.wait() 队列.出队() } } /** 向该流添加新元素。 */ def enqueue(elems: T*) { 队列.synchronized { 队列.enqueue(elems.map{Some(_)}: _*) 队列.notify() } } /** 关闭该流。 */ def 关闭() { 队列.synchronized { 队列.入队(无) 队列.notify() } } }
Paradigmatic的解决方案(稍作修改)
感谢您的建议。我稍微修改了范例的解决方案,以便 toStream 返回一个不可变的流(允许可重复读取),从而满足我的需求。为了完整起见,这里是代码:
import collection.JavaConversions._
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] = new LinkedBlockingQueue[Option[A]]() ) {
lazy val toStream: Stream[A] = queue2stream
private def queue2stream: Stream[A] = queue take match {
case Some(a) => Stream cons ( a, queue2stream )
case None => Stream empty
}
def close() = queue add None
def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}
I'm looking for a FIFO stream in Scala, i.e., something that provides the functionality of
- immutable.Stream (a stream that can be finite and memorizes the elements that have already been read)
- mutable.Queue (which allows for added elements to the FIFO)
The stream should be closable and should block access to the next element until the element has been added or the stream has been closed.
Actually I'm a bit surprised that the collection library does not (seem to) include such a data structure, since it is IMO a quite classical one.
My questions:
1) Did I overlook something? Is there already a class providing this functionality?
2) OK, if it's not included in the collection library then it might by just a trivial combination of existing collection classes. However, I tried to find this trivial code but my implementation looks still quite complex for such a simple problem. Is there a simpler solution for such a FifoStream?
class FifoStream[T] extends Closeable { val queue = new Queue[Option[T]] lazy val stream = nextStreamElem private def nextStreamElem: Stream[T] = next() match { case Some(elem) => Stream.cons(elem, nextStreamElem) case None => Stream.empty } /** Returns next element in the queue (may wait for it to be inserted). */ private def next() = { queue.synchronized { if (queue.isEmpty) queue.wait() queue.dequeue() } } /** Adds new elements to this stream. */ def enqueue(elems: T*) { queue.synchronized { queue.enqueue(elems.map{Some(_)}: _*) queue.notify() } } /** Closes this stream. */ def close() { queue.synchronized { queue.enqueue(None) queue.notify() } } }
Paradigmatic's solution (sightly modified)
Thanks for your suggestions. I slightly modified paradigmatic's solution so that toStream returns an immutable stream (allows for repeatable reads) so that it fits my needs. Just for completeness, here is the code:
import collection.JavaConversions._
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] = new LinkedBlockingQueue[Option[A]]() ) {
lazy val toStream: Stream[A] = queue2stream
private def queue2stream: Stream[A] = queue take match {
case Some(a) => Stream cons ( a, queue2stream )
case None => Stream empty
}
def close() = queue add None
def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
在 Scala 中,流是“函数式迭代器”。人们期望它们是纯粹的(没有副作用)和不可变的。在您的情况下,每次迭代流时都会修改队列(因此它不是纯粹的)。这可能会造成很多误解,因为迭代同一流两次会产生两个不同的结果。
话虽这么说,您应该使用 Java BlockingQueues,而不是滚动您自己的实现。它们在安全性和性能方面被认为实施得很好。这是我能想到的最干净的代码(使用你的方法):
In Scala, streams are "functional iterators". People expect them to be pure (no side effects) and immutable. In you case, everytime you iterate on the stream you modify the queue (so it's no pure). This can create a lot of misunderstandings, because iterating twice the same stream, will have two different results.
That being said, you should rather use Java BlockingQueues, rather than rolling your own implementation. They are considered well implemented in term of safety and performances. Here is the cleanest code I can think of (using your approach):
我假设您正在寻找类似 的内容java.util.concurrent.BlockingQueue?
Akka 有一个该接口的 BoundedBlockingQueue 实现。当然,java 中提供了可用的实现.util.并发。
您还可以考虑使用 Akka 的 actors 来完成您正在做的任何事情。使用 Actor 来通知或推送新事件或消息,而不是拉动。
I'm assuming you're looking for something like java.util.concurrent.BlockingQueue?
Akka has a BoundedBlockingQueue implementation of this interface. There are of course the implementations available in java.util.concurrent.
You might also consider using Akka's actors for whatever it is you are doing. Use Actors to be notified or pushed a new event or message instead of pulling.
1)您似乎正在寻找在 Oz 等语言中看到的数据流,它支持生产者-消费者模式。集合 API 中不提供此类集合,但您始终可以自己创建一个集合。
2)数据流流依赖于single的概念- 赋值变量(这样它们就不必在声明点初始化并在初始化之前读取它们会导致阻塞):
如果单赋值(或数据流),那么实现这样的流将很简单该语言支持变量(请参阅链接)。由于它们不是 Scala 的一部分,因此您必须像以前一样使用
wait
-synchronized
-notify
模式。可以使用 Java 并发队列正如其他用户所建议的那样,也可以实现这一目标。
1) It seems you're looking for a dataflow stream seen in languages like Oz, which supports the producer-consumer pattern. Such a collection is not available in the collections API, but you could always create one yourself.
2) The data flow stream relies on the concept of single-assignment variables (such that they don't have to be initialized upon declaration point and reading them prior to initialization causes blocking):
It would be straightforward to implement such a stream if single-assignment (or dataflow) variables were supported in the language (see the link). Since they are not a part of Scala, you have to use the
wait
-synchronized
-notify
pattern just like you did.Concurrent queues from Java can be used to achieve that as well, as the other user suggested.