使用 Scala Actors 创建像管道一样的东西
我现在正在为以下问题苦苦挣扎一周,需要一些建议。
def query(title: String): List[Search] // query("Terminator") => ["Terminator I", "Terminator II", "Terminator 1984", etc...]
def searchIMDB(s: Search): List[SearchResult]
def searchTMDB(s: Search): List[SearchResult]
def filterRedundantSearchResults(sr: SearchResult): Option[SearchResult]
def fetchIMDB(sr: SearchResult): List[MetaInfo]
def fetchTMDB(sr: SearchResult): List[MetaInfo]
def consolidate(infos: List[MetaInfo]): List[List[MetaInfo]]
我想构建一个像这样的管道:
query("Terminator")
-> [askIMDB, askTMDB, ...]
-> filterRedundantSearchResults (already-searched-state per query)
-> [fetchIMDB, fetchTMDB, ...]
-> consolidate (collected-meta-infos-state per query)
=> List[ TerminatorI-List[MetaInfo], TerminatorII-List[MetaInfo], ...]
到目前为止,我已经将每个管道段实现为一个演员。 我需要为每个查询创建专用的参与者实例,因为其中一些参与者(例如filterXXX和consolidate)需要维护每个查询的状态。
像askIMDB这样的函数会产生多个结果,我想同时处理这些结果(每个结果都有一个单独的演员)。因此,我没有找到任何方法在执行 query() 之前预先构建整个参与者图,也没有找到在运行时修改它的优雅方法。
我的第一次尝试是一个 Actor 链,并在消息中传递诸如 Transaction-ID 之类的东西,因此每个 Actor 都有一个 Map[TransactionID->State] 但这感觉相当难看。 第二次尝试是创建一种管道,将参与者的有向图抽象为一个流程,但到目前为止我失败了。
这是我的第一篇文章,如果我忘记了什么或者问题是一般/伪编码的,抱歉。非常感谢任何建议。谢谢!
I am struggling with the following problem for a week by now and need some advice.
def query(title: String): List[Search] // query("Terminator") => ["Terminator I", "Terminator II", "Terminator 1984", etc...]
def searchIMDB(s: Search): List[SearchResult]
def searchTMDB(s: Search): List[SearchResult]
def filterRedundantSearchResults(sr: SearchResult): Option[SearchResult]
def fetchIMDB(sr: SearchResult): List[MetaInfo]
def fetchTMDB(sr: SearchResult): List[MetaInfo]
def consolidate(infos: List[MetaInfo]): List[List[MetaInfo]]
I want to construct a Pipeline like:
query("Terminator")
-> [askIMDB, askTMDB, ...]
-> filterRedundantSearchResults (already-searched-state per query)
-> [fetchIMDB, fetchTMDB, ...]
-> consolidate (collected-meta-infos-state per query)
=> List[ TerminatorI-List[MetaInfo], TerminatorII-List[MetaInfo], ...]
So far, I have implemented every Pipeline-Segment as an Actor.
I need to create dedicated actor-instances for every Query, as some of those actors like filterXXX and consolidate need to maintain state per query.
Functions like askIMDB produce multiple results which I want to process concurrently (each to a seperate actor). So I have not found any way to pre-construct the whole graph of actors before executing the query() and neither an elegant way to modify it at runtime.
My first try was a chain of actors and passing sth like Transaction-IDs in the messages, so each Actor had a Map[TransactionID->State] but this felt rather ugly.
The second try was to create a sort-of-Pipeline abstracting the digraph of actors into one flow but I failed so far.
This is my first post, sorry if I forgot something or the question is to general/pseudo-coded. Any advice very much appreciated. Thanks!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我建议你看看 ScalaQuery,它做同样的事情。它可以这样做,因为这是一个单子问题。事实上,一些 Haskell 解决方案(例如 Arrows)似乎是由 Scalaz 库 实现的非常接近。
这将是最好的解决方案,因为正确的抽象将使将来的更改变得更容易。
作为一个黑客,我想到了这样的事情:
编辑
你也可以保证订购,只要有一点技巧。我在这里试图避免使用 Scala 2.8,尽管它可以通过命名参数和默认参数使这变得更容易。
现在,搜索者参与者保留过滤器列表、获取器列表以及对合并器的引用。他们收听通知他们这些事情的消息并进行查询。对于每个结果,他们为列表中的每个过滤器创建一个 Filter actor,向每个过滤器发送获取器和合并器的列表,然后向它们发送结果。
过滤器参与者保留一个获取器列表和对合并器的引用。他们聆听通知他们这些事情的消息,并寻找搜索者的结果。他们将输出(如果有)发送给新创建的提取器参与者,这些参与者首先被告知合并器。
获取器保留对合并器的引用。他们收听通知他们该引用的消息以及过滤器的结果。他们将结果发送给合并商。
集运商听取两条消息。来自 fetcher actor 的一条消息通知他们积累的结果。来自查询的另一条消息请求该结果并返回该结果。
剩下的唯一一件事就是设计一种方法,让合并商知道所有结果都已处理完毕。一种方法如下:
I suggest you take a look at ScalaQuery, which does about the same thing. And it can do so, because this is a monad problem. In fact, some Haskell solutions such as Arrows, which are implemented by the Scalaz library, seems to be pretty close.
That would be the best solution, as the proper abstraction will make changes easier in the future.
As a hack, I figure something like this:
EDIT
You can guarantee ordering too, with a bit of a trick. I'm trying to avoid Scala 2.8 here, though it can make this much easier with named and default parameters.
Now, Searchers actors keep a list of filters, a list of fetchers, and the reference to the consolidator. They listen to messages informing them of these things, and for the query. For each result, they create a Filter actor for every filter in the list, send each of them the list of fetchers and the consolidator, and then send them the result.
Filter actors keep a list of fetchers and a reference to the consolidator. They listen to messages informing them of these things, and for the result of the searcher. They send their output, if any, to newly created fetcher actors, who are first informed of the consolidator.
Fetchers keep a reference to the consolidators. They listen to a message informing them of that reference, and to the result from the filter. They send their result, in turn, to the consolidator.
The consolidator listen to two messages. One message, coming from fetcher actors, inform them of results, which they accumulate. Another message, coming from the Query, asks for that result, which it returns.
The only thing left is devising a way to let the consolidator know that all results have been processed. One way would be the following: