我怎样才能使这个可能简单的 Producer actor 示例正常工作?

发布于 2024-10-29 06:25:11 字数 2714 浏览 0 评论 0原文

我试图编写一个简单的制作人类,以便学习演员。我想要一个生产者,它从某个目录开始,由 File 对象表示,然后向其他参与者发送消息以处理文件。最初,我正在阅读文件的内容,但是,为了简单起见,现在我只是收集它们的路径。 再说一次,这没有现实世界的价值,但对我来说有实用价值,因为我认为这会让我更好地了解演员。这是我到目前为止所得到的:

import java.io._
import java.util.concurrent._
import scala.actors.Actor
import scala.io.Source
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicLong

case class FileSystemObject(path:File)
case class FileContent(content:String)

case object Stop
case object Processed

class ResultAcumulator extends Actor {

    var results:List[String] = Nil
    var finished = false

    def act() = {
        loop {
            react {
                case FileContent(content) => {
                    results ::= content
                }
                case Stop => {
                    finished = true;
                    exit
                }
            }
        }
    }
}

class FileSystemReader(accumulator:Actor) extends Actor {
    def act() = {
        loop {
            react {
                case FileSystemObject(path) => {
                    if(path.isFile) {
                        accumulator ! FileContent(path.toString)
                        sender ! Processed
                    }
                }
                case Stop => exit
            }
        }
    }
}

class FileSystemProducer(start:File,acumulator:Actor,reader:Actor) extends Actor {
    var totalFilesProcessed = 0

    def act() = {
        val files = start.listFiles
        files.foreach{ f =>
            (reader ! FileSystemObject(f))
        }
        loop {
            react {
                case Processed => {
                    totalFilesProcessed += 1
                    if(totalFilesProcessed == files.length) {
                        reader ! Stop
                        acumulator ! Stop
                        Xo.decrementLatch
                    }
                }
            }
        }

    }
}
object Xo {

     var latch = new CountDownLatch(1)

     def decrementLatch = latch.countDown

     def main(args : Array[String]) = {

         val acumulator = new ResultAcumulator
         val fsReader = new FileSystemReader(acumulator)
         val producer = new FileSystemProducer(new File("d:/rails/a"),acumulator,fsReader)

         acumulator.start
         fsReader.start
         producer.start

         latch.await

         acumulator.results.foreach(println)

     }
}

在现在的状态下,它会永远运行,而且我看不到任何输出。啊,还有一件事。在程序退出之前,我希望它列出“已处理”的结果。我搜索了一下,发现了 CountDownLatch 类。我想用循环/反应而不是 while/receive 来实现它。我很确定这个问题是由我有这些行引起的:

files.foreach{ f =>
    (reader ! FileSystemObject(f))
}

并且我的反应循环稍低一些,但我不知道如何修复它。

I was trying to write a simple producer class, in order to learn Actors. I wanted to have a producer, which starts with some directory, represented by a File object, and then sends messages to other actors in order to process the files. Initially, I was reading the contents of the files, but, for simplicity's sake, now I'm just collecting their paths.
Once again, this has no real world value, but it has practical value to me, as I think this will allow me to understand the actors better. Here's what I have so far:

import java.io._
import java.util.concurrent._
import scala.actors.Actor
import scala.io.Source
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicLong

case class FileSystemObject(path:File)
case class FileContent(content:String)

case object Stop
case object Processed

class ResultAcumulator extends Actor {

    var results:List[String] = Nil
    var finished = false

    def act() = {
        loop {
            react {
                case FileContent(content) => {
                    results ::= content
                }
                case Stop => {
                    finished = true;
                    exit
                }
            }
        }
    }
}

class FileSystemReader(accumulator:Actor) extends Actor {
    def act() = {
        loop {
            react {
                case FileSystemObject(path) => {
                    if(path.isFile) {
                        accumulator ! FileContent(path.toString)
                        sender ! Processed
                    }
                }
                case Stop => exit
            }
        }
    }
}

class FileSystemProducer(start:File,acumulator:Actor,reader:Actor) extends Actor {
    var totalFilesProcessed = 0

    def act() = {
        val files = start.listFiles
        files.foreach{ f =>
            (reader ! FileSystemObject(f))
        }
        loop {
            react {
                case Processed => {
                    totalFilesProcessed += 1
                    if(totalFilesProcessed == files.length) {
                        reader ! Stop
                        acumulator ! Stop
                        Xo.decrementLatch
                    }
                }
            }
        }

    }
}
object Xo {

     var latch = new CountDownLatch(1)

     def decrementLatch = latch.countDown

     def main(args : Array[String]) = {

         val acumulator = new ResultAcumulator
         val fsReader = new FileSystemReader(acumulator)
         val producer = new FileSystemProducer(new File("d:/rails/a"),acumulator,fsReader)

         acumulator.start
         fsReader.start
         producer.start

         latch.await

         acumulator.results.foreach(println)

     }
}

In the state it is now, it runs forever, and I see no output. Ah, one more thing. Before the program exits, I would like it to list the results "processed". I searched around a little and found the CountDownLatch class. I would like to keep this implemented with a loop/react instead of a while/receive. I am pretty sure the problem is caused by the fact that I have these lines:

files.foreach{ f =>
    (reader ! FileSystemObject(f))
}

and that I have the react loop a little lower, but I have no clue how to go about fixing it.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

人疚 2024-11-05 06:25:11

我想相关部分是
case FileSystemObject(路径) => {
if(路径.isFile) {
累加器!文件内容(路径.toString)
发件人!已加工
}
}
这里,不是“普通文件”的东西,例如目录,不会被发送到累加器。
因此,如果“d:/rails/a”中有子目录,则测试 totalFilesProcessed == files.length 将始终失败。

I guess that the relevant part is
case FileSystemObject(path) => {
if(path.isFile) {
accumulator ! FileContent(path.toString)
sender ! Processed
}
}
Here things that are not "normal Files", e.g., directories, are not sent to the accumulator.
So if there subdirectories in your "d:/rails/a" the test totalFilesProcessed == files.length will always fail.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文