scala优先级队列排序不正确?

发布于 2024-12-10 03:38:02 字数 4193 浏览 0 评论 0原文

我在 Scala 的 collection.mutable.PriorityQueue 中发现了一些奇怪的行为。我正在执行外部排序并使用 1M 记录对其进行测试。每次我运行测试并验证结果时,有 10-20 条记录未正确排序。我将 scala PriorityQueue 实现替换为 java.util.PriorityQueue ,并且它在 100% 的情况下都能正常工作。有什么想法吗?

这是代码(抱歉有点长......)。我使用 http://sortbenchmark.org/" rel="nofollow">http://sortbenchmark.org/" rel="nofollow">http://sortbenchmark.org/" 的工具 gensort -a 1000000valsort 对其进行测试。 /sortbenchmark.org/

def externalSort(inFileName: String, outFileName: String)
    (implicit ord: Ordering[String]): Int = {

  val MaxTempFiles = 1024
  val TempBufferSize = 4096

  val inFile = new java.io.File(inFileName)

  /** Partitions input file and sorts each partition */
  def partitionAndSort()(implicit ord: Ordering[String]):
      List[java.io.File] = {

    /** Gets block size to use */
    def getBlockSize: Long = {
      var blockSize = inFile.length / MaxTempFiles
      val freeMem = Runtime.getRuntime().freeMemory()
      if (blockSize < freeMem / 2)
        blockSize = freeMem / 2
      else if (blockSize >= freeMem)
        System.err.println("Not enough free memory to use external sort.")
      blockSize
    }

    /** Sorts and writes data to temp files */
    def writeSorted(buf: List[String]): java.io.File = {
      // Create new temp buffer
      val tmp = java.io.File.createTempFile("external", "sort")
      tmp.deleteOnExit()

      // Sort buffer and write it out to tmp file
      val out = new java.io.PrintWriter(tmp)
      try {
        for (l <- buf.sorted) {
          out.println(l)
        }
      } finally {
        out.close()
      }

      tmp
    }

    val blockSize = getBlockSize
    var tmpFiles = List[java.io.File]()
    var buf = List[String]()
    var currentSize = 0

    // Read input and divide into blocks
    for (line <- io.Source.fromFile(inFile).getLines()) {
      if (currentSize > blockSize) {
        tmpFiles ::= writeSorted(buf)
        buf = List[String]()
        currentSize = 0
      }
      buf ::= line
      currentSize += line.length() * 2 // 2 bytes per char
    }
    if (currentSize > 0) tmpFiles ::= writeSorted(buf)

    tmpFiles
  }

  /** Merges results of sorted partitions into one output file */
  def mergeSortedFiles(fs: List[java.io.File])
      (implicit ord: Ordering[String]): Int = {

    /** Temp file buffer for reading lines */
    class TempFileBuffer(val file: java.io.File) {

      private val in = new java.io.BufferedReader(
        new java.io.FileReader(file), TempBufferSize)
      private var curLine: String = ""

      readNextLine() // prep first value

      def currentLine = curLine

      def isEmpty = curLine == null

      def readNextLine() {
        if (curLine == null) return

        try {
          curLine = in.readLine()
        } catch {
          case _: java.io.EOFException => curLine = null
        }

        if (curLine == null) in.close()
      }

      override protected def finalize() {
        try {
          in.close()
        } finally {
          super.finalize()
        }
      }
    }

    val wrappedOrd = new Ordering[TempFileBuffer] {
      def compare(o1: TempFileBuffer, o2: TempFileBuffer): Int = {
        ord.compare(o1.currentLine, o2.currentLine)
      }
    }

    val pq = new collection.mutable.PriorityQueue[TempFileBuffer](
      )(wrappedOrd)

    // Init queue with item from each file
    for (tmp <- fs) {
      val buf = new TempFileBuffer(tmp)
      if (!buf.isEmpty) pq += buf
    }

    var count = 0

    val out = new java.io.PrintWriter(new java.io.File(outFileName))
    try {
      // Read each value off of queue
      while (pq.size > 0) {
        val buf = pq.dequeue()
        out.println(buf.currentLine)
        count += 1
        buf.readNextLine()
        if (buf.isEmpty) {
          buf.file.delete() // don't need anymore
        } else {
          // re-add to priority queue so we can process next line
          pq += buf
        }
      }
    } finally {
      out.close()
    }

    count
  }

  mergeSortedFiles(partitionAndSort())
}

I'm seeing some strange behavior with Scala's collection.mutable.PriorityQueue. I'm performing an external sort and testing it with 1M records. Each time I run the test and verify the results between 10-20 records are not sorted properly. I replace the scala PriorityQueue implementation with a java.util.PriorityQueue and it works 100% of the time. Any ideas?

Here's the code (sorry it's a bit long...). I test it using the tools gensort -a 1000000 and valsort from http://sortbenchmark.org/

def externalSort(inFileName: String, outFileName: String)
    (implicit ord: Ordering[String]): Int = {

  val MaxTempFiles = 1024
  val TempBufferSize = 4096

  val inFile = new java.io.File(inFileName)

  /** Partitions input file and sorts each partition */
  def partitionAndSort()(implicit ord: Ordering[String]):
      List[java.io.File] = {

    /** Gets block size to use */
    def getBlockSize: Long = {
      var blockSize = inFile.length / MaxTempFiles
      val freeMem = Runtime.getRuntime().freeMemory()
      if (blockSize < freeMem / 2)
        blockSize = freeMem / 2
      else if (blockSize >= freeMem)
        System.err.println("Not enough free memory to use external sort.")
      blockSize
    }

    /** Sorts and writes data to temp files */
    def writeSorted(buf: List[String]): java.io.File = {
      // Create new temp buffer
      val tmp = java.io.File.createTempFile("external", "sort")
      tmp.deleteOnExit()

      // Sort buffer and write it out to tmp file
      val out = new java.io.PrintWriter(tmp)
      try {
        for (l <- buf.sorted) {
          out.println(l)
        }
      } finally {
        out.close()
      }

      tmp
    }

    val blockSize = getBlockSize
    var tmpFiles = List[java.io.File]()
    var buf = List[String]()
    var currentSize = 0

    // Read input and divide into blocks
    for (line <- io.Source.fromFile(inFile).getLines()) {
      if (currentSize > blockSize) {
        tmpFiles ::= writeSorted(buf)
        buf = List[String]()
        currentSize = 0
      }
      buf ::= line
      currentSize += line.length() * 2 // 2 bytes per char
    }
    if (currentSize > 0) tmpFiles ::= writeSorted(buf)

    tmpFiles
  }

  /** Merges results of sorted partitions into one output file */
  def mergeSortedFiles(fs: List[java.io.File])
      (implicit ord: Ordering[String]): Int = {

    /** Temp file buffer for reading lines */
    class TempFileBuffer(val file: java.io.File) {

      private val in = new java.io.BufferedReader(
        new java.io.FileReader(file), TempBufferSize)
      private var curLine: String = ""

      readNextLine() // prep first value

      def currentLine = curLine

      def isEmpty = curLine == null

      def readNextLine() {
        if (curLine == null) return

        try {
          curLine = in.readLine()
        } catch {
          case _: java.io.EOFException => curLine = null
        }

        if (curLine == null) in.close()
      }

      override protected def finalize() {
        try {
          in.close()
        } finally {
          super.finalize()
        }
      }
    }

    val wrappedOrd = new Ordering[TempFileBuffer] {
      def compare(o1: TempFileBuffer, o2: TempFileBuffer): Int = {
        ord.compare(o1.currentLine, o2.currentLine)
      }
    }

    val pq = new collection.mutable.PriorityQueue[TempFileBuffer](
      )(wrappedOrd)

    // Init queue with item from each file
    for (tmp <- fs) {
      val buf = new TempFileBuffer(tmp)
      if (!buf.isEmpty) pq += buf
    }

    var count = 0

    val out = new java.io.PrintWriter(new java.io.File(outFileName))
    try {
      // Read each value off of queue
      while (pq.size > 0) {
        val buf = pq.dequeue()
        out.println(buf.currentLine)
        count += 1
        buf.readNextLine()
        if (buf.isEmpty) {
          buf.file.delete() // don't need anymore
        } else {
          // re-add to priority queue so we can process next line
          pq += buf
        }
      }
    } finally {
      out.close()
    }

    count
  }

  mergeSortedFiles(partitionAndSort())
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

雨的味道风的声音 2024-12-17 03:38:02

我的测试没有显示 PriorityQueue 中存在任何错误。

import org.scalacheck._
import Prop._

object PriorityQueueProperties extends Properties("PriorityQueue") {
  def listToPQ(l: List[String]): PriorityQueue[String] = { 
    val pq = new PriorityQueue[String]
    l foreach (pq +=)
    pq 
  }
  def pqToList(pq: PriorityQueue[String]): List[String] = 
    if (pq.isEmpty) Nil 
    else { val h = pq.dequeue; h :: pqToList(pq) }

  property("Enqueued elements are dequeued in reverse order") = 
    forAll { (l: List[String]) => l.sorted == pqToList(listToPQ(l)).reverse }

  property("Adding/removing elements doesn't break sorting") = 
    forAll { (l: List[String], s: String) => 
      (l.size > 0) ==> 
      ((s :: l.sorted.init).sorted == { 
        val pq = listToPQ(l)
        pq.dequeue
        pq += s
        pqToList(pq).reverse 
      })
    }
}

scala> PriorityQueueProperties.check
+ PriorityQueue.Enqueued elements are dequeued in reverse order: OK, passed
   100 tests.
+ PriorityQueue.Adding/removing elements doesn't break sorting: OK, passed 
  100 tests.

如果您能以某种方式减少足够的输入来制作测试用例,那将会有所帮助。

My tests don't show any bugs in PriorityQueue.

import org.scalacheck._
import Prop._

object PriorityQueueProperties extends Properties("PriorityQueue") {
  def listToPQ(l: List[String]): PriorityQueue[String] = { 
    val pq = new PriorityQueue[String]
    l foreach (pq +=)
    pq 
  }
  def pqToList(pq: PriorityQueue[String]): List[String] = 
    if (pq.isEmpty) Nil 
    else { val h = pq.dequeue; h :: pqToList(pq) }

  property("Enqueued elements are dequeued in reverse order") = 
    forAll { (l: List[String]) => l.sorted == pqToList(listToPQ(l)).reverse }

  property("Adding/removing elements doesn't break sorting") = 
    forAll { (l: List[String], s: String) => 
      (l.size > 0) ==> 
      ((s :: l.sorted.init).sorted == { 
        val pq = listToPQ(l)
        pq.dequeue
        pq += s
        pqToList(pq).reverse 
      })
    }
}

scala> PriorityQueueProperties.check
+ PriorityQueue.Enqueued elements are dequeued in reverse order: OK, passed
   100 tests.
+ PriorityQueue.Adding/removing elements doesn't break sorting: OK, passed 
  100 tests.

If you could somehow reduce the input enough to make a test case, it would help.

别挽留 2024-12-17 03:38:02

我用 500 万个输入运行了几次,输出始终符合预期。通过查看您的代码,我的猜测是您的排序是问题所在(即它给出了不一致的答案。)

I ran it with five million inputs several times, output matched expected always. My guess from looking at your code is that your Ordering is the problem (i.e. it's giving inconsistent answers.)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文