我怎样才能使这个可能简单的 Producer actor 示例正常工作?
我试图编写一个简单的制作人类,以便学习演员。我想要一个生产者,它从某个目录开始,由 File 对象表示,然后向其他参与者发送消息以处理文件。最初,我正在阅读文件的内容,但是,为了简单起见,现在我只是收集它们的路径。 再说一次,这没有现实世界的价值,但对我来说有实用价值,因为我认为这会让我更好地了解演员。这是我到目前为止所得到的:
import java.io._
import java.util.concurrent._
import scala.actors.Actor
import scala.io.Source
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicLong
case class FileSystemObject(path:File)
case class FileContent(content:String)
case object Stop
case object Processed
class ResultAcumulator extends Actor {
var results:List[String] = Nil
var finished = false
def act() = {
loop {
react {
case FileContent(content) => {
results ::= content
}
case Stop => {
finished = true;
exit
}
}
}
}
}
class FileSystemReader(accumulator:Actor) extends Actor {
def act() = {
loop {
react {
case FileSystemObject(path) => {
if(path.isFile) {
accumulator ! FileContent(path.toString)
sender ! Processed
}
}
case Stop => exit
}
}
}
}
class FileSystemProducer(start:File,acumulator:Actor,reader:Actor) extends Actor {
var totalFilesProcessed = 0
def act() = {
val files = start.listFiles
files.foreach{ f =>
(reader ! FileSystemObject(f))
}
loop {
react {
case Processed => {
totalFilesProcessed += 1
if(totalFilesProcessed == files.length) {
reader ! Stop
acumulator ! Stop
Xo.decrementLatch
}
}
}
}
}
}
object Xo {
var latch = new CountDownLatch(1)
def decrementLatch = latch.countDown
def main(args : Array[String]) = {
val acumulator = new ResultAcumulator
val fsReader = new FileSystemReader(acumulator)
val producer = new FileSystemProducer(new File("d:/rails/a"),acumulator,fsReader)
acumulator.start
fsReader.start
producer.start
latch.await
acumulator.results.foreach(println)
}
}
在现在的状态下,它会永远运行,而且我看不到任何输出。啊,还有一件事。在程序退出之前,我希望它列出“已处理”的结果。我搜索了一下,发现了 CountDownLatch 类。我想用循环/反应而不是 while/receive 来实现它。我很确定这个问题是由我有这些行引起的:
files.foreach{ f =>
(reader ! FileSystemObject(f))
}
并且我的反应循环稍低一些,但我不知道如何修复它。
I was trying to write a simple producer class, in order to learn Actors. I wanted to have a producer, which starts with some directory, represented by a File object, and then sends messages to other actors in order to process the files. Initially, I was reading the contents of the files, but, for simplicity's sake, now I'm just collecting their paths.
Once again, this has no real world value, but it has practical value to me, as I think this will allow me to understand the actors better. Here's what I have so far:
import java.io._
import java.util.concurrent._
import scala.actors.Actor
import scala.io.Source
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicLong
case class FileSystemObject(path:File)
case class FileContent(content:String)
case object Stop
case object Processed
class ResultAcumulator extends Actor {
var results:List[String] = Nil
var finished = false
def act() = {
loop {
react {
case FileContent(content) => {
results ::= content
}
case Stop => {
finished = true;
exit
}
}
}
}
}
class FileSystemReader(accumulator:Actor) extends Actor {
def act() = {
loop {
react {
case FileSystemObject(path) => {
if(path.isFile) {
accumulator ! FileContent(path.toString)
sender ! Processed
}
}
case Stop => exit
}
}
}
}
class FileSystemProducer(start:File,acumulator:Actor,reader:Actor) extends Actor {
var totalFilesProcessed = 0
def act() = {
val files = start.listFiles
files.foreach{ f =>
(reader ! FileSystemObject(f))
}
loop {
react {
case Processed => {
totalFilesProcessed += 1
if(totalFilesProcessed == files.length) {
reader ! Stop
acumulator ! Stop
Xo.decrementLatch
}
}
}
}
}
}
object Xo {
var latch = new CountDownLatch(1)
def decrementLatch = latch.countDown
def main(args : Array[String]) = {
val acumulator = new ResultAcumulator
val fsReader = new FileSystemReader(acumulator)
val producer = new FileSystemProducer(new File("d:/rails/a"),acumulator,fsReader)
acumulator.start
fsReader.start
producer.start
latch.await
acumulator.results.foreach(println)
}
}
In the state it is now, it runs forever, and I see no output. Ah, one more thing. Before the program exits, I would like it to list the results "processed". I searched around a little and found the CountDownLatch class. I would like to keep this implemented with a loop/react instead of a while/receive. I am pretty sure the problem is caused by the fact that I have these lines:
files.foreach{ f =>
(reader ! FileSystemObject(f))
}
and that I have the react loop a little lower, but I have no clue how to go about fixing it.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我想相关部分是
case FileSystemObject(路径) => {
if(路径.isFile) {
累加器!文件内容(路径.toString)
发件人!已加工
}
}
这里,不是“普通文件”的东西,例如目录,不会被发送到累加器。
因此,如果“d:/rails/a”中有子目录,则测试
totalFilesProcessed == files.length
将始终失败。I guess that the relevant part is
case FileSystemObject(path) => {
if(path.isFile) {
accumulator ! FileContent(path.toString)
sender ! Processed
}
}
Here things that are not "normal Files", e.g., directories, are not sent to the accumulator.
So if there subdirectories in your "d:/rails/a" the test
totalFilesProcessed == files.length
will always fail.