烦人的 Twisted Python 问题

发布于 2024-08-28 23:35:29 字数 3534 浏览 9 评论 0原文

出于个人兴趣,我尝试回答以下问题: 最快的方法是什么在 Python 中发送 100,000 个 HTTP 请求?

这就是我到目前为止所想到的,但我遇到了一些非常奇怪的事情。

installSignalHandlersTrue时,它就会挂起。我可以看到 DelayedCall 实例位于 reactor._newTimedCalls 中,但 processResponse 永远不会被调用。

installSignalHandlersFalse时,它会抛出错误并起作用。

from twisted.internet import reactor
from twisted.web.client import Agent
from threading import Semaphore, Thread
import time

concurrent = 100
s = Semaphore(concurrent)
reactor.suggestThreadPoolSize(concurrent)
t=Thread(
    target=reactor.run,
    kwargs={'installSignalHandlers':True})
t.daemon=True
t.start()


agent = Agent(reactor)


def processResponse(response,url):
    print response.code, url
    s.release()

def processError(response,url):
    print "error", url
    s.release()

def addTask(url):
    req = agent.request('HEAD', url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)


for url in open('urllist.txt'):
    addTask(url.strip())    
    s.acquire()
while s._Semaphore__value!=concurrent:
    time.sleep(0.1)     

reactor.stop()

这是当 installSignalHandlers 为 True 时抛出的错误: (注意:这是预期的行为!问题是为什么当 installSignalHandlers 为 False 时它不起作用。)

Traceback (most recent call last):
  File "/usr/lib/python2.6/dist-packages/twisted/internet/base.py", line 396, in fireEvent
    DeferredList(beforeResults).addCallback(self._continueFiring)
  File "/usr/lib/python2.6/dist-packages/twisted/internet/defer.py", line 224, in addCallback
    callbackKeywords=kw)
  File "/usr/lib/python2.6/dist-packages/twisted/internet/defer.py", line 213, in addCallbacks
    self._runCallbacks()
  File "/usr/lib/python2.6/dist-packages/twisted/internet/defer.py", line 371, in _runCallbacks
    self.result = callback(self.result, *args, **kw)
--- <exception caught here> ---
  File "/usr/lib/python2.6/dist-packages/twisted/internet/base.py", line 409, in _continueFiring
    callable(*args, **kwargs)
  File "/usr/lib/python2.6/dist-packages/twisted/internet/base.py", line 1165, in _reallyStartRunning
    self._handleSignals()
  File "/usr/lib/python2.6/dist-packages/twisted/internet/base.py", line 1105, in _handleSignals
    signal.signal(signal.SIGINT, self.sigInt)
exceptions.ValueError: signal only works in main thread

我做错了什么,正确的方法是什么? 我是扭曲的新手。

@莫谢兹: 谢谢。现在可以运行了:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 100
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

I'm trying to answer the following question out of personal interest:
What is the fastest way to send 100,000 HTTP requests in Python?

And this is what I have came up so far, but I'm experiencing something very stange.

When installSignalHandlers is True, it just hangs. I can see that the DelayedCall instances are in reactor._newTimedCalls, but processResponse never gets called.

When installSignalHandlers is False, it throws an error and works.

from twisted.internet import reactor
from twisted.web.client import Agent
from threading import Semaphore, Thread
import time

concurrent = 100
s = Semaphore(concurrent)
reactor.suggestThreadPoolSize(concurrent)
t=Thread(
    target=reactor.run,
    kwargs={'installSignalHandlers':True})
t.daemon=True
t.start()


agent = Agent(reactor)


def processResponse(response,url):
    print response.code, url
    s.release()

def processError(response,url):
    print "error", url
    s.release()

def addTask(url):
    req = agent.request('HEAD', url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)


for url in open('urllist.txt'):
    addTask(url.strip())    
    s.acquire()
while s._Semaphore__value!=concurrent:
    time.sleep(0.1)     

reactor.stop()

And here is the error that it throws when installSignalHandlers is True:
(Note: This is the expected behaviour! The question is why it doesn't work when installSignalHandlers is False.)

Traceback (most recent call last):
  File "/usr/lib/python2.6/dist-packages/twisted/internet/base.py", line 396, in fireEvent
    DeferredList(beforeResults).addCallback(self._continueFiring)
  File "/usr/lib/python2.6/dist-packages/twisted/internet/defer.py", line 224, in addCallback
    callbackKeywords=kw)
  File "/usr/lib/python2.6/dist-packages/twisted/internet/defer.py", line 213, in addCallbacks
    self._runCallbacks()
  File "/usr/lib/python2.6/dist-packages/twisted/internet/defer.py", line 371, in _runCallbacks
    self.result = callback(self.result, *args, **kw)
--- <exception caught here> ---
  File "/usr/lib/python2.6/dist-packages/twisted/internet/base.py", line 409, in _continueFiring
    callable(*args, **kwargs)
  File "/usr/lib/python2.6/dist-packages/twisted/internet/base.py", line 1165, in _reallyStartRunning
    self._handleSignals()
  File "/usr/lib/python2.6/dist-packages/twisted/internet/base.py", line 1105, in _handleSignals
    signal.signal(signal.SIGINT, self.sigInt)
exceptions.ValueError: signal only works in main thread

What am I doing wrong and what is the right way? I'm new to twisted.

@moshez:
Thanks. It works now:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 100
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

怀念你的温柔 2024-09-04 23:35:29

您从主线程使用了太多的“反应器调用”(例如,agent.request 很有可能调用反应器)。我不确定这是否是您的问题,但它仍然不受支持——从非反应器线程进行的唯一反应器调用是reactor.callFromThread。

而且,整个架构看起来很奇怪。为什么不在主线程上运行反应器?从反应器中读取包含 10,000 个请求的整个文件并将其拆分应该不是问题,即使您一次完成所有操作。

您可能可以找到不使用任何线程的纯 Twisted 解决方案。

You're using waaaaay too much "reactor calls" (for example, there's a good chance that agent.request calls into the reactor) from the main thread. I'm not sure if that's your problem, but it's still not supported -- the only reactor calls to make from the non-reactor thread is reactor.callFromThread.

Also, the whole architecture seems strange. Why are you not running the reactor on the main thread? Reading a whole file with 10,000 requests, and splitting them, should not be a problem to do from the reactor, even if you do it all at once.

You can probably hit a pure-Twisted solution not using any threads.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文