如何在 Scala 中编写聚合模式?

发布于 2024-12-04 03:11:38 字数 635 浏览 1 评论 0原文

假设我有 Iterator[A]大小无限),并且我想从中获取 Iterator[B],其中一些后续类型值A 已聚合。

例子: 我有字符串列表:

Iterator(
    "START",
    "DATA1",
    "DATA2",
    "DATA3",
    "START",
    "DATA1",
    "DATA2",
    //.. 10^10 more records
)

我想连接从 START 到 NEXT START 的字符串(排除)。即编写解析器。

Iterator(
"START DATA1 DATA2 DATA3",
"START DATA1 DATA2",
    //.. 10^10 / 5 more records
)

我知道如何强制执行此操作,但我想使用 scala 高阶函数来完成它。有什么想法吗?

PS EIP 聚合http://camel.apache.org/aggregator2.html

Suppose I have Iterator[A](size is infinite) and I want to get Iterator[B] from it where some subsequent values of type A are aggregated.

Example:
I have list of strings:

Iterator(
    "START",
    "DATA1",
    "DATA2",
    "DATA3",
    "START",
    "DATA1",
    "DATA2",
    //.. 10^10 more records
)

I want to join strings from START to NEXT START excluding. I.e. write parser.

Iterator(
"START DATA1 DATA2 DATA3",
"START DATA1 DATA2",
    //.. 10^10 / 5 more records
)

I know how to do this imperatively, but I want to accomplish it with scala higher order functions. Any ideas?

PS EIP Aggregate http://camel.apache.org/aggregator2.html.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

童话 2024-12-11 03:11:38

如果您想要一个函数式解决方案,您应该使用 Streams 而不是迭代器(流是不可变的)。这是一种可能的方法:

def aggregate(strs: Stream[String] ) = { 
  aggregateRec( strs )
}

def aggregateRec( strs: Stream[String] ): Stream[String] = {
  val tail = strs.drop(1)
  if( tail.nonEmpty ) {
    val (str, rest ) = accumulate( tail )
    Stream.cons( str, aggregateRec( rest ) )
  }
  else Stream.empty
}

def accumulate( strs: Stream[String] ): (String, Stream[String])  = {
  val first = "START " + strs.takeWhile( _ != "START").mkString(" ")
  val rest = strs.dropWhile( _ != "START" )
  ( first, rest )
}

它按预期工作:

val strs = Stream( "START", "1", "2", "3", "START", "A", "B" )
val strs2 = aggregate( strs )
strs2 foreach println

If you want a functional solution, you should use Streams rather than iterators (streams are immutable). Here's one possible approach:

def aggregate(strs: Stream[String] ) = { 
  aggregateRec( strs )
}

def aggregateRec( strs: Stream[String] ): Stream[String] = {
  val tail = strs.drop(1)
  if( tail.nonEmpty ) {
    val (str, rest ) = accumulate( tail )
    Stream.cons( str, aggregateRec( rest ) )
  }
  else Stream.empty
}

def accumulate( strs: Stream[String] ): (String, Stream[String])  = {
  val first = "START " + strs.takeWhile( _ != "START").mkString(" ")
  val rest = strs.dropWhile( _ != "START" )
  ( first, rest )
}

It works as expected:

val strs = Stream( "START", "1", "2", "3", "START", "A", "B" )
val strs2 = aggregate( strs )
strs2 foreach println
萝莉病 2024-12-11 03:11:38

嗯,无限的流会极大地改变事情。假设我了解您的其他情况,这应该有效:

def aggregate(it: Iterator[String]) = new Iterator[String] {
  if (it.hasNext) it.next
  def hasNext = it.hasNext
  def next = "START " + (it.takeWhile(_ != "START")).mkString(" ")
}

这样您就可以:

val i = aggregate(yourStream.iterator)
i.take(20).foreach(println) // or whatever

Well, an infinite stream changes things rather dramatically. Assuming I understand the rest of your situation, this should work:

def aggregate(it: Iterator[String]) = new Iterator[String] {
  if (it.hasNext) it.next
  def hasNext = it.hasNext
  def next = "START " + (it.takeWhile(_ != "START")).mkString(" ")
}

So that you can:

val i = aggregate(yourStream.iterator)
i.take(20).foreach(println) // or whatever
晨光如昨 2024-12-11 03:11:38

你可以尝试一下折叠:

val ls = List(
  "START",
  "DATA1",
  "DATA2",
  "DATA3",
  "START",
  "DATA1",
  "DATA2"
)

(List[List[String]]() /: ls) { (acc, elem) =>
  if (elem == "START")
    List(elem) :: acc // new head list
  else
    (elem :: acc.head) :: acc.tail // prepend to current head list
} map (_.reverse mkString " ") reverse;

You could try it with a fold:

val ls = List(
  "START",
  "DATA1",
  "DATA2",
  "DATA3",
  "START",
  "DATA1",
  "DATA2"
)

(List[List[String]]() /: ls) { (acc, elem) =>
  if (elem == "START")
    List(elem) :: acc // new head list
  else
    (elem :: acc.head) :: acc.tail // prepend to current head list
} map (_.reverse mkString " ") reverse;
洛阳烟雨空心柳 2024-12-11 03:11:38

使用流:

object Iter {
  def main(args: Array[String]) {
    val es = List("START", "DATA1", "DATA2", "START", "DATA1", "START")
    val bit = batched(es.iterator, "START")
    println(bit.head.toList)
    println(bit.tail.head.toList)
  }

  def batched[T](it: Iterator[T], start: T) = { 
    def nextBatch(): Stream[List[T]] = { 
      (it takeWhile { _ != start }).toList match {
        case Nil => nextBatch()
        case es => Stream.cons(start :: es, nextBatch())
      }
    }
    nextBatch()
  }

}  

With Streams:

object Iter {
  def main(args: Array[String]) {
    val es = List("START", "DATA1", "DATA2", "START", "DATA1", "START")
    val bit = batched(es.iterator, "START")
    println(bit.head.toList)
    println(bit.tail.head.toList)
  }

  def batched[T](it: Iterator[T], start: T) = { 
    def nextBatch(): Stream[List[T]] = { 
      (it takeWhile { _ != start }).toList match {
        case Nil => nextBatch()
        case es => Stream.cons(start :: es, nextBatch())
      }
    }
    nextBatch()
  }

}  
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文