使用 Celery 作为 Twisted 应用程序的控制通道
我正在尝试使用 Celery 作为 Twisted 应用程序的控制通道。我的 Twisted 应用程序是一个抽象层,为各种本地运行的进程(通过 ProcessProtocol)提供标准接口。我想使用 Celery 来远程控制 - AMQP 似乎是从中央位置控制许多 Twisted 应用程序的理想方法,并且我想利用 Celery 的基于任务的功能,例如任务重试、子任务等。
这是没有按我计划的那样工作,我希望有人能帮助我指明正确的方向以使其正常工作。
当我运行脚本时我试图实现的行为是:
- 启动一个稍微修改过的 celeryd (请参阅 当
- 等待 Celery 任务
- 收到“启动进程”任务时,生成一个 ProcessProtocol
- 当收到其他任务时,在 Twisted 协议上运行一个函数并使用 Deferreds 返回结果
'稍微修改的 celeryd' 是 celeryd 进行了一些小修改,允许任务访问通过 self.app.twisted 扭曲反应器,通过 self.app.process 生成进程。为了简单起见,我使用 Celery 的“单独”进程池实现,它不会为任务工作人员分叉新进程。
当我尝试使用 Celery 任务来初始化 ProcessProtocol (即启动外部进程)时,就会出现问题。该进程正确启动,但 ProcessProtocol 的 childDataReceived 永远不会被调用。我认为这与文件描述符未正确继承/设置有关。
下面是一些示例代码,基于 ProcessProtocol 文档中的“wc”示例。它包括两个 Celery 任务 - 一个用于启动 wc 进程,另一个用于计算某些文本中的单词数(使用之前启动的 wc 进程)。
这个例子相当人为,但如果我能让这个工作正常,它将作为实现我的 ProcessProtocols 的一个很好的起点,它是长时间运行的进程,它将响应写入 stdin 的命令。
我首先通过运行 Celery 守护进程来测试这一点:
python2.6 mycelery.py -l info -P alone
然后,在另一个窗口中,运行发送两个任务的脚本:
python2.6 command_test.py
command_test.py 的预期行为用于执行两个命令 - 一个启动 wc 进程,另一个将一些文本发送到 CountWordsTask。实际发生的情况是:
- StartProcTask 生成进程,并通过 Defered 接收“进程启动”作为响应
- CountWordsTask 永远不会收到结果,因为 childDataReceived 永远不会被调用
任何人都可以阐明这一点,或者提供一些关于如何最好的建议使用 Celery 作为 Twisted ProcessProtocols 的控制通道?
为 Celery 编写一个 Twisted-backed ProcessPool 实现会更好吗?我通过reactor.callLater 调用WorkerCommand.execute_from_commandline 的方法是否是确保一切发生在Twisted 线程内的正确方法?
我读过有关 AMPoule 的文章,我认为它可以提供其中一些功能,但如果可能的话,我想坚持使用 Celery,因为我在应用程序的其他部分使用它。
任何帮助或协助将不胜感激!
myceleryd.py
from functools import partial
from celery.app import App
from celery.bin.celeryd import WorkerCommand
from twisted.internet import reactor
class MyCeleryApp(App):
def __init__(self, twisted, *args, **kwargs):
self.twisted = twisted
super(MyCeleryApp, self).__init__(*args, **kwargs)
def main():
get_my_app = partial(MyCeleryApp, reactor)
worker = WorkerCommand(get_app=get_my_app)
reactor.callLater(1, worker.execute_from_commandline)
reactor.run()
if __name__ == '__main__':
main()
协议.py
from twisted.internet import protocol
from twisted.internet.defer import Deferred
class WCProcessProtocol(protocol.ProcessProtocol):
def __init__(self, text):
self.text = text
self._waiting = {} # Dict to contain deferreds, keyed by command name
def connectionMade(self):
if 'startup' in self._waiting:
self._waiting['startup'].callback('process started')
def outReceived(self, data):
fieldLength = len(data) / 3
lines = int(data[:fieldLength])
words = int(data[fieldLength:fieldLength*2])
chars = int(data[fieldLength*2:])
self.transport.loseConnection()
self.receiveCounts(lines, words, chars)
if 'countWords' in self._waiting:
self._waiting['countWords'].callback(words)
def processExited(self, status):
print 'exiting'
def receiveCounts(self, lines, words, chars):
print >> sys.stderr, 'Received counts from wc.'
print >> sys.stderr, 'Lines:', lines
print >> sys.stderr, 'Words:', words
print >> sys.stderr, 'Characters:', chars
def countWords(self, text):
self._waiting['countWords'] = Deferred()
self.transport.write(text)
return self._waiting['countWords']
任务.py
from celery.task import Task
from protocol import WCProcessProtocol
from twisted.internet.defer import Deferred
from twisted.internet import reactor
class StartProcTask(Task):
def run(self):
self.app.proc = WCProcessProtocol('testing')
self.app.proc._waiting['startup'] = Deferred()
self.app.twisted.spawnProcess(self.app.proc,
'wc',
['wc'],
usePTY=True)
return self.app.proc._waiting['startup']
class CountWordsTask(Task):
def run(self):
return self.app.proc.countWords('test test')
I am trying to use Celery as the control channel for a Twisted application. My Twisted application is an abstraction layer that provides a standard interface to various locally running processes (via ProcessProtocol). I would like to use Celery to control this remotely - AMQP seems like the ideal method of controlling many Twisted apps from a central location, and I would like to take advantage of Celery's task-based features, e.g. task retries, subtasks etc.
This is not working as I had planned, and I am hoping someone can help point me in the right direction to get this working.
The behaviour I am trying to achieve when I run my script is:
- Start a slightly-modified celeryd (see
below) - Wait for Celery tasks
- When the 'start process' task is received, spawn a ProcessProtocol
- When other tasks are received, run a function on the Twisted protocol and return the result using Deferreds
The 'slightly-modified celeryd' is celeryd with a small modification that allows tasks to access the Twisted reactor via self.app.twisted, and the spawned process via self.app.process. To keep things simple I am using Celery's 'solo' process pool implentation, which does not fork a new process for task workers.
My problem occurs when I try and use a Celery task to initialise the ProcessProtocol (i.e. starting the external process). The process starts correctly, but the ProcessProtocol's childDataReceived never gets called. I think this is something to do with file descriptors not being inherited/set correctly.
Below is some sample code, based on the 'wc' example in the ProcessProtocol documentation. It includes two Celery tasks - one to start the wc process, and another to count the words in some text (using the previously-started wc process).
This example is rather contrived but if I can get this working it will serve as a good starting point for implementing my ProcessProtocols, which are long-running processes which will respond to commands written to stdin.
I am testing this by running the Celery daemon first:
python2.6 mycelery.py -l info -P solo
Then, in another window, running a script which sends two tasks:
python2.6 command_test.py
The expected behaviour of command_test.py is for two commands to execute - one starts the wc process, and the other sends some text to CountWordsTask. What actually happens is:
- The StartProcTask spawns the process, and receives 'process started' as a response via a Deffered
- The CountWordsTask never receives a result, because childDataReceived is never called
Can anyone shed some light on this, or offer some advice on how best to use Celery as a control channel for Twisted ProcessProtocols?
Would it be better to write a Twisted-backed ProcessPool implementation for Celery? Is my method of calling WorkerCommand.execute_from_commandline via reactor.callLater the right approach to ensure everything happens within the Twisted thread?
I have read about AMPoule, which I think could provide some of this functionality, but would like to stick with Celery if possible as I use it in other parts of my application.
Any help or assistance will be gratefully appreciated!
myceleryd.py
from functools import partial
from celery.app import App
from celery.bin.celeryd import WorkerCommand
from twisted.internet import reactor
class MyCeleryApp(App):
def __init__(self, twisted, *args, **kwargs):
self.twisted = twisted
super(MyCeleryApp, self).__init__(*args, **kwargs)
def main():
get_my_app = partial(MyCeleryApp, reactor)
worker = WorkerCommand(get_app=get_my_app)
reactor.callLater(1, worker.execute_from_commandline)
reactor.run()
if __name__ == '__main__':
main()
protocol.py
from twisted.internet import protocol
from twisted.internet.defer import Deferred
class WCProcessProtocol(protocol.ProcessProtocol):
def __init__(self, text):
self.text = text
self._waiting = {} # Dict to contain deferreds, keyed by command name
def connectionMade(self):
if 'startup' in self._waiting:
self._waiting['startup'].callback('process started')
def outReceived(self, data):
fieldLength = len(data) / 3
lines = int(data[:fieldLength])
words = int(data[fieldLength:fieldLength*2])
chars = int(data[fieldLength*2:])
self.transport.loseConnection()
self.receiveCounts(lines, words, chars)
if 'countWords' in self._waiting:
self._waiting['countWords'].callback(words)
def processExited(self, status):
print 'exiting'
def receiveCounts(self, lines, words, chars):
print >> sys.stderr, 'Received counts from wc.'
print >> sys.stderr, 'Lines:', lines
print >> sys.stderr, 'Words:', words
print >> sys.stderr, 'Characters:', chars
def countWords(self, text):
self._waiting['countWords'] = Deferred()
self.transport.write(text)
return self._waiting['countWords']
tasks.py
from celery.task import Task
from protocol import WCProcessProtocol
from twisted.internet.defer import Deferred
from twisted.internet import reactor
class StartProcTask(Task):
def run(self):
self.app.proc = WCProcessProtocol('testing')
self.app.proc._waiting['startup'] = Deferred()
self.app.twisted.spawnProcess(self.app.proc,
'wc',
['wc'],
usePTY=True)
return self.app.proc._waiting['startup']
class CountWordsTask(Task):
def run(self):
return self.app.proc.countWords('test test')
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
Celery 在等待来自网络的新消息时可能会阻塞。由于您将其与 Twisted 反应器一起在一个单线程进程中运行,因此它会阻止反应器运行。这将禁用大部分 Twisted,这需要反应器实际运行(您调用了reactor.run,但由于 Celery 阻止了它,所以它实际上没有运行)。
reactor.callLater
仅延迟 Celery 的启动。一旦 Celery 启动,它仍然会阻塞反应器。您需要避免的问题是阻塞反应堆。
一种解决方案是在一个线程中运行 Celery,在另一个线程中运行 Reactor。使用reactor.callFromThread从Celery线程向Twisted发送消息(“在反应器线程中调用函数”)。如果您需要从 Twisted 线程将消息发送回 Celery,请使用 Celery 等效项。
另一种解决方案是将 Celery 协议(AMQP? - 请参阅 txAMQP)作为本机 Twisted 库实现,并使用它来不阻塞地处理 Celery 消息。
Celery probably blocks while waiting for new messages from the network. Since you're running it in one single-threaded process along with the Twisted reactor, it blocks the reactor from running. This will disable most of Twisted, which requires the reactor to actually run (you called
reactor.run
, but with Celery blocking it, it is effectively not running).reactor.callLater
only delays the startup of Celery. Once Celery starts, it's still blocking the reactor.The problem you need to avoid is blocking the reactor.
One solution would be to run Celery in one thread and the reactor in another thread. Use
reactor.callFromThread
to send messages to Twisted ("call functions in the reactor thread") from the Celery thread. Use the Celery equivalent if you need to send messages back to Celery from the Twisted thread.Another solution would be to implement the Celery protocol (AMQP? - see txAMQP) as a native Twisted library and use that to process Celery messages without blocking.