烦人的 Twisted Python 问题
出于个人兴趣,我尝试回答以下问题: 最快的方法是什么在 Python 中发送 100,000 个 HTTP 请求?
这就是我到目前为止所想到的,但我遇到了一些非常奇怪的事情。
当installSignalHandlers为True时,它就会挂起。我可以看到 DelayedCall
实例位于 reactor._newTimedCalls
中,但 processResponse
永远不会被调用。
当installSignalHandlers为False时,它会抛出错误并起作用。
from twisted.internet import reactor
from twisted.web.client import Agent
from threading import Semaphore, Thread
import time
concurrent = 100
s = Semaphore(concurrent)
reactor.suggestThreadPoolSize(concurrent)
t=Thread(
target=reactor.run,
kwargs={'installSignalHandlers':True})
t.daemon=True
t.start()
agent = Agent(reactor)
def processResponse(response,url):
print response.code, url
s.release()
def processError(response,url):
print "error", url
s.release()
def addTask(url):
req = agent.request('HEAD', url)
req.addCallback(processResponse, url)
req.addErrback(processError, url)
for url in open('urllist.txt'):
addTask(url.strip())
s.acquire()
while s._Semaphore__value!=concurrent:
time.sleep(0.1)
reactor.stop()
这是当 installSignalHandlers 为 True 时抛出的错误: (注意:这是预期的行为!问题是为什么当 installSignalHandlers 为 False 时它不起作用。)
Traceback (most recent call last):
File "/usr/lib/python2.6/dist-packages/twisted/internet/base.py", line 396, in fireEvent
DeferredList(beforeResults).addCallback(self._continueFiring)
File "/usr/lib/python2.6/dist-packages/twisted/internet/defer.py", line 224, in addCallback
callbackKeywords=kw)
File "/usr/lib/python2.6/dist-packages/twisted/internet/defer.py", line 213, in addCallbacks
self._runCallbacks()
File "/usr/lib/python2.6/dist-packages/twisted/internet/defer.py", line 371, in _runCallbacks
self.result = callback(self.result, *args, **kw)
--- <exception caught here> ---
File "/usr/lib/python2.6/dist-packages/twisted/internet/base.py", line 409, in _continueFiring
callable(*args, **kwargs)
File "/usr/lib/python2.6/dist-packages/twisted/internet/base.py", line 1165, in _reallyStartRunning
self._handleSignals()
File "/usr/lib/python2.6/dist-packages/twisted/internet/base.py", line 1105, in _handleSignals
signal.signal(signal.SIGINT, self.sigInt)
exceptions.ValueError: signal only works in main thread
我做错了什么,正确的方法是什么? 我是扭曲的新手。
@莫谢兹: 谢谢。现在可以运行了:
from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools
concurrent = 100
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)
def getStatus(ourl):
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status
def processResponse(response,url):
print response, url
processedOne()
def processError(error,url):
print "error", url#, error
processedOne()
def processedOne():
if finished.next()==added:
reactor.stop()
def addTask(url):
req = threads.deferToThread(getStatus, url)
req.addCallback(processResponse, url)
req.addErrback(processError, url)
added=0
for url in open('urllist.txt'):
added+=1
addTask(url.strip())
try:
reactor.run()
except KeyboardInterrupt:
reactor.stop()
I'm trying to answer the following question out of personal interest:
What is the fastest way to send 100,000 HTTP requests in Python?
And this is what I have came up so far, but I'm experiencing something very stange.
When installSignalHandlers is True, it just hangs. I can see that the DelayedCall
instances are in reactor._newTimedCalls
, but processResponse
never gets called.
When installSignalHandlers is False, it throws an error and works.
from twisted.internet import reactor
from twisted.web.client import Agent
from threading import Semaphore, Thread
import time
concurrent = 100
s = Semaphore(concurrent)
reactor.suggestThreadPoolSize(concurrent)
t=Thread(
target=reactor.run,
kwargs={'installSignalHandlers':True})
t.daemon=True
t.start()
agent = Agent(reactor)
def processResponse(response,url):
print response.code, url
s.release()
def processError(response,url):
print "error", url
s.release()
def addTask(url):
req = agent.request('HEAD', url)
req.addCallback(processResponse, url)
req.addErrback(processError, url)
for url in open('urllist.txt'):
addTask(url.strip())
s.acquire()
while s._Semaphore__value!=concurrent:
time.sleep(0.1)
reactor.stop()
And here is the error that it throws when installSignalHandlers is True:
(Note: This is the expected behaviour! The question is why it doesn't work when installSignalHandlers is False.)
Traceback (most recent call last):
File "/usr/lib/python2.6/dist-packages/twisted/internet/base.py", line 396, in fireEvent
DeferredList(beforeResults).addCallback(self._continueFiring)
File "/usr/lib/python2.6/dist-packages/twisted/internet/defer.py", line 224, in addCallback
callbackKeywords=kw)
File "/usr/lib/python2.6/dist-packages/twisted/internet/defer.py", line 213, in addCallbacks
self._runCallbacks()
File "/usr/lib/python2.6/dist-packages/twisted/internet/defer.py", line 371, in _runCallbacks
self.result = callback(self.result, *args, **kw)
--- <exception caught here> ---
File "/usr/lib/python2.6/dist-packages/twisted/internet/base.py", line 409, in _continueFiring
callable(*args, **kwargs)
File "/usr/lib/python2.6/dist-packages/twisted/internet/base.py", line 1165, in _reallyStartRunning
self._handleSignals()
File "/usr/lib/python2.6/dist-packages/twisted/internet/base.py", line 1105, in _handleSignals
signal.signal(signal.SIGINT, self.sigInt)
exceptions.ValueError: signal only works in main thread
What am I doing wrong and what is the right way? I'm new to twisted.
@moshez:
Thanks. It works now:
from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools
concurrent = 100
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)
def getStatus(ourl):
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status
def processResponse(response,url):
print response, url
processedOne()
def processError(error,url):
print "error", url#, error
processedOne()
def processedOne():
if finished.next()==added:
reactor.stop()
def addTask(url):
req = threads.deferToThread(getStatus, url)
req.addCallback(processResponse, url)
req.addErrback(processError, url)
added=0
for url in open('urllist.txt'):
added+=1
addTask(url.strip())
try:
reactor.run()
except KeyboardInterrupt:
reactor.stop()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您从主线程使用了太多的“反应器调用”(例如,agent.request 很有可能调用反应器)。我不确定这是否是您的问题,但它仍然不受支持——从非反应器线程进行的唯一反应器调用是reactor.callFromThread。
而且,整个架构看起来很奇怪。为什么不在主线程上运行反应器?从反应器中读取包含 10,000 个请求的整个文件并将其拆分应该不是问题,即使您一次完成所有操作。
您可能可以找到不使用任何线程的纯 Twisted 解决方案。
You're using waaaaay too much "reactor calls" (for example, there's a good chance that agent.request calls into the reactor) from the main thread. I'm not sure if that's your problem, but it's still not supported -- the only reactor calls to make from the non-reactor thread is reactor.callFromThread.
Also, the whole architecture seems strange. Why are you not running the reactor on the main thread? Reading a whole file with 10,000 requests, and splitting them, should not be a problem to do from the reactor, even if you do it all at once.
You can probably hit a pure-Twisted solution not using any threads.