使用 Scala Actors 创建像管道一样的东西

发布于 2024-08-15 04:49:05 字数 1288 浏览 6 评论 0原文

我现在正在为以下问题苦苦挣扎一周,需要一些建议。

def query(title: String): List[Search]   // query("Terminator") => ["Terminator I", "Terminator II", "Terminator 1984", etc...]

def searchIMDB(s: Search): List[SearchResult]
def searchTMDB(s: Search): List[SearchResult]

def filterRedundantSearchResults(sr: SearchResult): Option[SearchResult]

def fetchIMDB(sr: SearchResult): List[MetaInfo]
def fetchTMDB(sr: SearchResult): List[MetaInfo]

def consolidate(infos: List[MetaInfo]): List[List[MetaInfo]]

我想构建一个像这样的管道:

query("Terminator")
-> [askIMDB, askTMDB, ...]
-> filterRedundantSearchResults (already-searched-state per query)
-> [fetchIMDB, fetchTMDB, ...]
-> consolidate                  (collected-meta-infos-state per query)
   => List[  TerminatorI-List[MetaInfo],  TerminatorII-List[MetaInfo],  ...]

到目前为止,我已经将每个管道段实现为一个演员。 我需要为每个查询创建专用的参与者实例,因为其中一些参与者(例如filterXXX和consolidate)需要维护每个查询的状态。

像askIMDB这样的函数会产生多个结果,我想同时处理这些结果(每个结果都有一个单独的演员)。因此,我没有找到任何方法在执行 query() 之前预先构建整个参与者图,也没有找到在运行时修改它的优雅方法。

我的第一次尝试是一个 Actor 链,并在消息中传递诸如 Transaction-ID 之类的东西,因此每个 Actor 都有一个 Map[TransactionID->State] 但这感觉相当难看。 第二次尝试是创建一种管道,将参与者的有向图抽象为一个流程,但到目前为止我失败了。

这是我的第一篇文章,如果我忘记了什么或者问题是一般/伪编码的,抱歉。非常感谢任何建议。谢谢!

I am struggling with the following problem for a week by now and need some advice.

def query(title: String): List[Search]   // query("Terminator") => ["Terminator I", "Terminator II", "Terminator 1984", etc...]

def searchIMDB(s: Search): List[SearchResult]
def searchTMDB(s: Search): List[SearchResult]

def filterRedundantSearchResults(sr: SearchResult): Option[SearchResult]

def fetchIMDB(sr: SearchResult): List[MetaInfo]
def fetchTMDB(sr: SearchResult): List[MetaInfo]

def consolidate(infos: List[MetaInfo]): List[List[MetaInfo]]

I want to construct a Pipeline like:

query("Terminator")
-> [askIMDB, askTMDB, ...]
-> filterRedundantSearchResults (already-searched-state per query)
-> [fetchIMDB, fetchTMDB, ...]
-> consolidate                  (collected-meta-infos-state per query)
   => List[  TerminatorI-List[MetaInfo],  TerminatorII-List[MetaInfo],  ...]

So far, I have implemented every Pipeline-Segment as an Actor.
I need to create dedicated actor-instances for every Query, as some of those actors like filterXXX and consolidate need to maintain state per query.

Functions like askIMDB produce multiple results which I want to process concurrently (each to a seperate actor). So I have not found any way to pre-construct the whole graph of actors before executing the query() and neither an elegant way to modify it at runtime.

My first try was a chain of actors and passing sth like Transaction-IDs in the messages, so each Actor had a Map[TransactionID->State] but this felt rather ugly.
The second try was to create a sort-of-Pipeline abstracting the digraph of actors into one flow but I failed so far.

This is my first post, sorry if I forgot something or the question is to general/pseudo-coded. Any advice very much appreciated. Thanks!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

染柒℉ 2024-08-22 04:49:05

我建议你看看 ScalaQuery,它做同样的事情。它可以这样做,因为这是一个单子问题。事实上,一些 Haskell 解决方案(例如 Arrows)似乎是由 Scalaz 库 实现的非常接近。

这将是最好的解决方案,因为正确的抽象将使将来的更改变得更容易。

作为一个黑客,我想到了这样的事情:

abstract class QueryModifiers
case object Consolidate extends QueryModifiers
// create others as appropriate

class Query(title: String) {
  self =>

  // Create actors
  def createActor(qm: QueryModifiers): Actor = {
    val actor = qm match {
      case Consolidate => // create a consolidator actor
      case //... as needed
    }
    actor.start
    actor
  }

  // The pipeline
  val pipe: List[List[QueryModifiers]] = Nil

  // Build the pipeline
  def ->(qms: List[QueryModifiers]) = new Query(title) {
    override val pipe = qms :: self.pipe
  }
  def ->(qm: QueryModifiers) = new Query(title) {
    override val pipe = List(qm) :: self.pipe
  }
  def ->(c: Consolidate.type) = {
    // Define the full pipeline
    // Because the way pipe is built, the last layer comes first, and the first comes last
    val pipeline = Consolidate :: pipe

    // Create an actor for every QueryModifier, using an unspecified createActor function
    val actors = pipeline map (_ map (createActor(_))

    // We have a list of lists of actors now, where the first element of the list
    // was the last QueryModifiers we received; so, group the layers by two, and for each
    // pair, make the second element send the result to the first.
    // Since each layer can contain many actors, make each member of the second
    // layer send the results to each member of the first layer.
    // The actors should be expecting to receive message SendResultsTo at any time.
    for {
      List(nextLayer, previousLayer) <- actors.iterator sliding 2
      nextActor <- nextLayer
      previousActor <- previousLayer
    } previousActor ! SendResultsTo(nextActor)

    // Send the query to the first layer
    for ( firstActor <- actors.last ) firstActor ! Query(title)

    // Get the result from the last layer, which is the consolidator
    val results = actors.head.head !? Results

    // Return the results
    results
  }
}

编辑

你也可以保证订购,只要有一点技巧。我在这里试图避免使用 Scala 2.8,尽管它可以通过命名参数和默认参数使这变得更容易。

sealed abstract class QueryModifiers
case class QMSearcher(/*...*/) extends QueryModifiers
case class QMFilter(/*...*/) extends QueryModifiers
case class QMFetcher(/*...*/) extends QueryModifiers
case object Consolidate extends QueryModifiers

class Query[NextQM] private (title: String, searchers: List[QMSeacher], filters: List[QMFilter], fetchers: List[QMFetcher]) {

// Build the pipeline
  def ->[T <: NextQM](qms: List[NextQM])(implicit m: Manifest[T]) = m.toString match {
    case "QMSearch" => new Query[QMFilter](title, qms, Nil, Nil)
    case "QMFilter" => new Query[QMFetcher](title, seachers, qms, Nil)
    case "QMFetcher" => new Query[Consolidate.type](title, searches, filters, qms)
    case _ /* "Consolidate$", actually */ => error("List of consolidate unexpected")
  }
  // Do similarly for qm: NextQM

  // Consolidation
  def ->(qm: Consolidate.type) = {
     // Create Searchers actors
     // Send them the Filters
     // Send them Fetchers
     // Create the Consolidator actor
     // Send it to Searchers actors
     // Send Searchers the query
     // Ask Consolidator for answer
  }
}

object Query {
  def apply(title: String) = new Query[QMSearcher](title, Nil, Nil, Nil)
}

现在,搜索者参与者保留过滤器列表、获取器列表以及对合并器的引用。他们收听通知他们这些事情的消息并进行查询。对于每个结果,他们为列表中的每个过滤器创建一个 Filter actor,向每个过滤器发送获取器和合并器的列表,然后向它们发送结果。

过滤器参与者保留一个获取器列表和对合并器的引用。他们聆听通知他们这些事情的消息,并寻找搜索者的结果。他们将输出(如果有)发送给新创建的提取器参与者,这些参与者首先被告知合并器。

获取器保留对合并器的引用。他们收听通知他们该引用的消息以及过滤器的结果。他们将结果发送给合并商。

集运商听取两条消息。来自 fetcher actor 的一条消息通知他们积累的结果。来自查询的另一条消息请求该结果并返回该结果。

剩下的唯一一件事就是设计一种方法,让合并商知道所有结果都已处理完毕。一种方法如下:

  1. 在查询中,将创建的每个搜索器告知合并器参与者。合并商保存它们的列表,并带有一个标志指示它们是否完成。
  2. 每个搜索器都会保留其创建的过滤器列表,并等待来自它们的“完成”消息。当搜索器没有剩余处理要做并且已从所有过滤器接收到“完成”时,它会向合并器发送一条消息,通知它已完成。
  3. 每个过滤器依次保存它所创建的获取器列表,并且同样等待来自它们的“完成”消息。当它完成处理并且从所有获取器接收到“完成”时,它通知搜索器它已经完成。
  4. 当其工作完成并发送到合并器时,它的获取器会向创建它的过滤器发送一条“完成”消息。
  5. 合并器仅在收到所有搜索者的“完成”后才侦听查询结果的消息。

I suggest you take a look at ScalaQuery, which does about the same thing. And it can do so, because this is a monad problem. In fact, some Haskell solutions such as Arrows, which are implemented by the Scalaz library, seems to be pretty close.

That would be the best solution, as the proper abstraction will make changes easier in the future.

As a hack, I figure something like this:

abstract class QueryModifiers
case object Consolidate extends QueryModifiers
// create others as appropriate

class Query(title: String) {
  self =>

  // Create actors
  def createActor(qm: QueryModifiers): Actor = {
    val actor = qm match {
      case Consolidate => // create a consolidator actor
      case //... as needed
    }
    actor.start
    actor
  }

  // The pipeline
  val pipe: List[List[QueryModifiers]] = Nil

  // Build the pipeline
  def ->(qms: List[QueryModifiers]) = new Query(title) {
    override val pipe = qms :: self.pipe
  }
  def ->(qm: QueryModifiers) = new Query(title) {
    override val pipe = List(qm) :: self.pipe
  }
  def ->(c: Consolidate.type) = {
    // Define the full pipeline
    // Because the way pipe is built, the last layer comes first, and the first comes last
    val pipeline = Consolidate :: pipe

    // Create an actor for every QueryModifier, using an unspecified createActor function
    val actors = pipeline map (_ map (createActor(_))

    // We have a list of lists of actors now, where the first element of the list
    // was the last QueryModifiers we received; so, group the layers by two, and for each
    // pair, make the second element send the result to the first.
    // Since each layer can contain many actors, make each member of the second
    // layer send the results to each member of the first layer.
    // The actors should be expecting to receive message SendResultsTo at any time.
    for {
      List(nextLayer, previousLayer) <- actors.iterator sliding 2
      nextActor <- nextLayer
      previousActor <- previousLayer
    } previousActor ! SendResultsTo(nextActor)

    // Send the query to the first layer
    for ( firstActor <- actors.last ) firstActor ! Query(title)

    // Get the result from the last layer, which is the consolidator
    val results = actors.head.head !? Results

    // Return the results
    results
  }
}

EDIT

You can guarantee ordering too, with a bit of a trick. I'm trying to avoid Scala 2.8 here, though it can make this much easier with named and default parameters.

sealed abstract class QueryModifiers
case class QMSearcher(/*...*/) extends QueryModifiers
case class QMFilter(/*...*/) extends QueryModifiers
case class QMFetcher(/*...*/) extends QueryModifiers
case object Consolidate extends QueryModifiers

class Query[NextQM] private (title: String, searchers: List[QMSeacher], filters: List[QMFilter], fetchers: List[QMFetcher]) {

// Build the pipeline
  def ->[T <: NextQM](qms: List[NextQM])(implicit m: Manifest[T]) = m.toString match {
    case "QMSearch" => new Query[QMFilter](title, qms, Nil, Nil)
    case "QMFilter" => new Query[QMFetcher](title, seachers, qms, Nil)
    case "QMFetcher" => new Query[Consolidate.type](title, searches, filters, qms)
    case _ /* "Consolidate$", actually */ => error("List of consolidate unexpected")
  }
  // Do similarly for qm: NextQM

  // Consolidation
  def ->(qm: Consolidate.type) = {
     // Create Searchers actors
     // Send them the Filters
     // Send them Fetchers
     // Create the Consolidator actor
     // Send it to Searchers actors
     // Send Searchers the query
     // Ask Consolidator for answer
  }
}

object Query {
  def apply(title: String) = new Query[QMSearcher](title, Nil, Nil, Nil)
}

Now, Searchers actors keep a list of filters, a list of fetchers, and the reference to the consolidator. They listen to messages informing them of these things, and for the query. For each result, they create a Filter actor for every filter in the list, send each of them the list of fetchers and the consolidator, and then send them the result.

Filter actors keep a list of fetchers and a reference to the consolidator. They listen to messages informing them of these things, and for the result of the searcher. They send their output, if any, to newly created fetcher actors, who are first informed of the consolidator.

Fetchers keep a reference to the consolidators. They listen to a message informing them of that reference, and to the result from the filter. They send their result, in turn, to the consolidator.

The consolidator listen to two messages. One message, coming from fetcher actors, inform them of results, which they accumulate. Another message, coming from the Query, asks for that result, which it returns.

The only thing left is devising a way to let the consolidator know that all results have been processed. One way would be the following:

  1. In the Query, inform the Consolidator actor of every Searcher that was created. The consolidator keeps a list of them, with a flag indicating whether they are finished or not.
  2. Each searcher keeps a list of the filters it created, and waits for a "done" message from them. When a searcher has no processing left to do and has received "done" from all filters, it sends a message to the consolidator informing it that it has finished.
  3. Each filter, in turn, keeps a list of fetchers it has created, and, likewise, waits for "done" messages from them. When it has finished processing, and has received "done" from all fetchers, it informs the searcher that it has done.
  4. It fetcher sends a "done" message to the filter that has created it when its work is completed and sent to the consolidator.
  5. The consolidator only listens to the message querying the result after it has received a "done" from all searchers.
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文