批评这个python代码(带有线程池的爬虫)
这段 python 代码有多好?需要批评) 这段代码有一个错误,有时脚本会打印“ALL WAIT - CAN FINISH!” 并冻结(不再发生任何操作..)但我找不到发生这种情况的原因?
带线程池的网站爬虫:
import sys
from urllib import urlopen
from BeautifulSoup import BeautifulSoup, SoupStrainer
import re
from Queue import Queue, Empty
from threading import Thread
W_WAIT = 1
W_WORK = 0
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, pool, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
self.pool = pool
self.state = None
def is_wait(self):
return self.state == W_WAIT
def run(self):
while True:
#if all workers wait - time to exsit
print "CHECK WAIT: !!! ",self.pool.is_all_wait()
if self.pool.is_all_wait():
print "ALL WAIT - CAN FINISH!"
return
try:
func, args, kargs = self.tasks.get(timeout=3)
except Empty:
print "task wait timeout"
continue
self.state = W_WORK
print "START !!! in thread %s" % str(self)
#print args
try: func(*args, **kargs)
except Exception, e: print e
print "!!! STOP in thread %s", str(self)
self.tasks.task_done()
self.state = W_WAIT
#threads can fast empty it!
#if self.tasks.qsize() == 0:
# print "QUIT!!!!!!"
# break
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
#self.tasks = Queue(num_threads)
self.tasks = Queue()
self.workers = []
for _ in range(num_threads):
self.workers.append(Worker(self,self.tasks))
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def is_all_wait(self):
for w in self.workers:
if not w.is_wait():
return False
return True
visited = set()
queue = Queue()
external_links_set = set()
internal_links_set = set()
external_links = 0
def process(pool,host,url):
try:
content = urlopen(url).read()
except UnicodeDecodeError:
return
for link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')):
try:
href = link['href']
except KeyError:
continue
if not href.startswith('http://'):
href = 'http://%s%s' % (host, href)
if not href.startswith('http://%s%s' % (host, '/')):
continue
internal_links_set.add(href)
if href not in visited:
visited.add(href)
pool.add_task(process,pool,host,href)
else:
pass
def start(host,charset):
pool = ThreadPool(20)
pool.add_task(process,pool,host,'http://%s/' % (host))
pool.wait_completion()
start('evgenm.com','utf8')
感谢帮助!我做了新的实现: 对于这个代码#2 你有什么想说的? ===================================尝试#2============= ==========================
import sys
from urllib import urlopen
from BeautifulSoup import BeautifulSoup, SoupStrainer
import re
from Queue import Queue, Empty
from threading import Thread
W_STOP = 1
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, pool, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.pool = pool
self.state = None
self.start()
def stop(self):
self.state = W_STOP
def run(self):
while True:
if self.state == W_STOP:
print "\ncalled stop"
break
try:
func, args, kargs = self.tasks.get(timeout=3)
except Empty:
continue
print "\n***START*** %s" % str(self)
try:
func(*args, **kargs)
except Exception, e:
print e
print "\n***STOP*** %s", str(self)
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
#self.tasks = Queue(num_threads)
self.tasks = Queue()
self.workers = []
for _ in range(num_threads):
self.workers.append(Worker(self,self.tasks))
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def stop_threads(self):
for w in self.workers:
w.stop()
def wait_stop(self):
self.wait_completion()
self.stop_threads()
visited = set()
queue = Queue()
external_links_set = set()
internal_links_set = set()
external_links = 0
def process(pool,host,url):
try:
content = urlopen(url).read()
except UnicodeDecodeError:
return
for link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')):
try:
href = link['href']
except KeyError:
continue
if not href.startswith('http://'):
href = 'http://%s%s' % (host, href)
if not href.startswith('http://%s%s' % (host, '/')):
continue
internal_links_set.add(href)
if href not in visited:
visited.add(href)
pool.add_task(process,pool,host,href)
else:
pass
def start(host,charset):
pool = ThreadPool(20)
pool.add_task(process,pool,host,'http://%s/' % (host))
pool.wait_stop()
start('evgenm.com','utf8')
how good this python code ? need criticism)
there is a error in this code, some times script do print "ALL WAIT - CAN FINISH!"
and freeze (no more actions are happend..) but i can't find reason why this happend?
site crawler with threadpool:
import sys
from urllib import urlopen
from BeautifulSoup import BeautifulSoup, SoupStrainer
import re
from Queue import Queue, Empty
from threading import Thread
W_WAIT = 1
W_WORK = 0
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, pool, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
self.pool = pool
self.state = None
def is_wait(self):
return self.state == W_WAIT
def run(self):
while True:
#if all workers wait - time to exsit
print "CHECK WAIT: !!! ",self.pool.is_all_wait()
if self.pool.is_all_wait():
print "ALL WAIT - CAN FINISH!"
return
try:
func, args, kargs = self.tasks.get(timeout=3)
except Empty:
print "task wait timeout"
continue
self.state = W_WORK
print "START !!! in thread %s" % str(self)
#print args
try: func(*args, **kargs)
except Exception, e: print e
print "!!! STOP in thread %s", str(self)
self.tasks.task_done()
self.state = W_WAIT
#threads can fast empty it!
#if self.tasks.qsize() == 0:
# print "QUIT!!!!!!"
# break
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
#self.tasks = Queue(num_threads)
self.tasks = Queue()
self.workers = []
for _ in range(num_threads):
self.workers.append(Worker(self,self.tasks))
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def is_all_wait(self):
for w in self.workers:
if not w.is_wait():
return False
return True
visited = set()
queue = Queue()
external_links_set = set()
internal_links_set = set()
external_links = 0
def process(pool,host,url):
try:
content = urlopen(url).read()
except UnicodeDecodeError:
return
for link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')):
try:
href = link['href']
except KeyError:
continue
if not href.startswith('http://'):
href = 'http://%s%s' % (host, href)
if not href.startswith('http://%s%s' % (host, '/')):
continue
internal_links_set.add(href)
if href not in visited:
visited.add(href)
pool.add_task(process,pool,host,href)
else:
pass
def start(host,charset):
pool = ThreadPool(20)
pool.add_task(process,pool,host,'http://%s/' % (host))
pool.wait_completion()
start('evgenm.com','utf8')
Thanx for help! i make new implementation:
What you can say about this code#2 ?
==================================TRY #2=======================================
import sys
from urllib import urlopen
from BeautifulSoup import BeautifulSoup, SoupStrainer
import re
from Queue import Queue, Empty
from threading import Thread
W_STOP = 1
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, pool, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.pool = pool
self.state = None
self.start()
def stop(self):
self.state = W_STOP
def run(self):
while True:
if self.state == W_STOP:
print "\ncalled stop"
break
try:
func, args, kargs = self.tasks.get(timeout=3)
except Empty:
continue
print "\n***START*** %s" % str(self)
try:
func(*args, **kargs)
except Exception, e:
print e
print "\n***STOP*** %s", str(self)
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
#self.tasks = Queue(num_threads)
self.tasks = Queue()
self.workers = []
for _ in range(num_threads):
self.workers.append(Worker(self,self.tasks))
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def stop_threads(self):
for w in self.workers:
w.stop()
def wait_stop(self):
self.wait_completion()
self.stop_threads()
visited = set()
queue = Queue()
external_links_set = set()
internal_links_set = set()
external_links = 0
def process(pool,host,url):
try:
content = urlopen(url).read()
except UnicodeDecodeError:
return
for link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')):
try:
href = link['href']
except KeyError:
continue
if not href.startswith('http://'):
href = 'http://%s%s' % (host, href)
if not href.startswith('http://%s%s' % (host, '/')):
continue
internal_links_set.add(href)
if href not in visited:
visited.add(href)
pool.add_task(process,pool,host,href)
else:
pass
def start(host,charset):
pool = ThreadPool(20)
pool.add_task(process,pool,host,'http://%s/' % (host))
pool.wait_stop()
start('evgenm.com','utf8')
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
您正在线程之间共享状态(即在
is_all_wait
中),而无需同步。另外,所有线程都在“等待”这一事实并不能可靠地表明队列为空(例如,它们可能都在获取任务的过程中)。我怀疑,有时,线程会在队列真正为空之前退出。如果这种情况发生得足够频繁,您将在队列中留下任务,但没有线程来运行它们。所以queue.join()
将永远等待。我的建议是:
is_all_wait
——它不是一个可靠的指标state
——这并不是真正必要的queue.join< /code> 让您知道所有内容何时处理完毕
如果您需要终止线程(例如,这是一个较大的、长时间运行的程序的一部分),请在
queue.join().
You are sharing state between threads (i.e., in
is_all_wait
) without synchronization. Plus, the fact that all threads are "waiting" is not a reliable indicator that the queue is empty (for instance, they could all be in the process of getting a task). I suspect that, occasionally, threads are exiting before the queue is truly empty. If this happens often enough, you will be left with tasks in the queue but no threads to run them. Soqueue.join()
will wait forever.My recomendation is:
is_all_wait
-- it's not a reliable indicatorstate
-- it's not really necessaryqueue.join
to let you know when everything is processedIf you need to kill the threads (for example, this is part of a larger, long-running program), then do so after the
queue.join()
.我有基本的Python知识,但Python中的线程不是没有用吗?我看过很多批评全局锁解释器的文章。
I have basic python knowledge but threading in python isn't useless? I've seen tons of articles criticizing the global lock interpreter for this.