cron 作业的应用程序引擎 DeadlineExceededError 和维基百科爬虫的任务队列
我正在尝试在谷歌应用程序引擎上构建一个维基百科链接爬虫。我想在数据存储中存储索引。但我遇到了 cron 作业和任务队列的 DeadlineExceededError 。
对于 cron 作业,我有以下代码:
def buildTree(self):
start=time.time()
self.log.info(" Start Time: %f" % start)
nobranches=TreeNode.all()
for tree in nobranches:
if tree.branches==[]:
self.addBranches(tree)
time.sleep(1)
if (time.time()-start) > 10 :
break
self.log.info("Time Eclipsed: %f" % (time.time()-start))
self.log.info(" End Time:%f" % time.clock())
我不明白为什么 for 循环在 10 秒后没有中断。它在开发服务器上执行。服务器上的 time.time() 肯定有问题。还有其他功能我可以使用吗?
对于任务队列,我有以下代码:
<代码> def addNewBranch(self, keywords, level=0):
self.log.debug("Add Tree")
self.addBranches(keyword)
t=TreeNode.gql("WHERE name=:1", keyword).get()
branches=t.nodes
if level < 3:
for branch in branches:
if branch.branches == []:
taskqueue.add(url="/addTree/%s" % branch.name)
self.log.debug("url:%s" % "/addTree/%s" % branch.name)
日志显示它们都遇到了 DeadlineExceededError。后台处理时间不应超过页面请求的 30 秒。有没有办法解决异常?
这是 addBranch() 的代码
def addBranches(self, 关键字):
tree=TreeNode.gql("WHERE name=:1", keyword).get()
if tree is None:
tree=TreeNode(name=keyword)
self.log.debug("in addBranches arguments: tree %s", tree.name)
t=urllib2.quote(tree.name.encode('utf8'))
s="http://en.wikipedia.org/w/api.php?action=query&titles=%s&prop=links&pllimit=500&format=xml" % t
self.log.debug(s)
try:
usock = urllib2.urlopen(s)
except :
self.log.error( "Could not retrieve doc: %s" % tree.name)
usock=None
if usock is not None:
try:
xmldoc=minidom.parse(usock)
except Exception , error:
self.log.error("Parse Error: %s" % error)
return None
usock.close()
try:
pyNode= xmldoc.getElementsByTagName('pl')
self.log.debug("Nodes to be added: %d" % pyNode.length)
except Exception, e:
pyNode=None
self.log.error("Getting Nodes Error: %s" % e)
return None
newNodes=[]
if pyNode is not None:
for child in pyNode:
node=None
node= TreeNode.gql("WHERE name=:1", child.attributes["title"].value).get()
if node is None:
newNodes.append(TreeNode(name=child.attributes["title"].value))
else:
tree.branches.append(node.key())
db.put(newNodes)
for node in newNodes:
tree.branches.append(node.key())
self.log.debug("Node Added: %s" % node.name)
tree.put()
return tree.branches
I am trying to build a wikipedia link crawler on google app engine. I wanted to store an index in the datastore. But I run into the DeadlineExceededError for both cron jobs and task queue.
for the cron job I have this code:
def buildTree(self):
start=time.time()
self.log.info(" Start Time: %f" % start)
nobranches=TreeNode.all()
for tree in nobranches:
if tree.branches==[]:
self.addBranches(tree)
time.sleep(1)
if (time.time()-start) > 10 :
break
self.log.info("Time Eclipsed: %f" % (time.time()-start))
self.log.info(" End Time:%f" % time.clock())
I don't understand why the for loop doesn't break after 10 seconds. It does on the dev server. Something must be wrong with the time.time() on the server. Is there another function I can use?
for the task queue I have this code:
def addNewBranch(self, keyword, level=0):
self.log.debug("Add Tree")
self.addBranches(keyword)
t=TreeNode.gql("WHERE name=:1", keyword).get()
branches=t.nodes
if level < 3:
for branch in branches:
if branch.branches == []:
taskqueue.add(url="/addTree/%s" % branch.name)
self.log.debug("url:%s" % "/addTree/%s" % branch.name)
The logs show that they both run into the DeadlineExceededError. Shouldn't background processing have a longer that the 30 seconds for the page request. Is there a way around the exception?
Here is the code for addBranch()
def addBranches(self, keyword):
tree=TreeNode.gql("WHERE name=:1", keyword).get()
if tree is None:
tree=TreeNode(name=keyword)
self.log.debug("in addBranches arguments: tree %s", tree.name)
t=urllib2.quote(tree.name.encode('utf8'))
s="http://en.wikipedia.org/w/api.php?action=query&titles=%s&prop=links&pllimit=500&format=xml" % t
self.log.debug(s)
try:
usock = urllib2.urlopen(s)
except :
self.log.error( "Could not retrieve doc: %s" % tree.name)
usock=None
if usock is not None:
try:
xmldoc=minidom.parse(usock)
except Exception , error:
self.log.error("Parse Error: %s" % error)
return None
usock.close()
try:
pyNode= xmldoc.getElementsByTagName('pl')
self.log.debug("Nodes to be added: %d" % pyNode.length)
except Exception, e:
pyNode=None
self.log.error("Getting Nodes Error: %s" % e)
return None
newNodes=[]
if pyNode is not None:
for child in pyNode:
node=None
node= TreeNode.gql("WHERE name=:1", child.attributes["title"].value).get()
if node is None:
newNodes.append(TreeNode(name=child.attributes["title"].value))
else:
tree.branches.append(node.key())
db.put(newNodes)
for node in newNodes:
tree.branches.append(node.key())
self.log.debug("Node Added: %s" % node.name)
tree.put()
return tree.branches
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(4)
我在 GAE 上的日期时间方面取得了巨大成功。
time_taken 将是一个时间增量。您可以将其与具有您感兴趣的持续时间的另一个时间增量进行比较。
听起来使用 MapReduce 或任务队列会为您提供更好的服务。两者对于处理大量记录都很有趣。
对于您拥有的代码来说,更简洁的模式是仅获取一些记录。
此代码只会提取 100 条记录。如果您有满 100 个项目,完成后,您可以在队列中添加另一个项目以启动更多项目。
-- 基于关于需要没有分支的树的评论 --
我在那里看不到你的模型,但是如果我试图创建所有没有分支的树的列表并处理它们,我会:仅获取树的键以 100 个左右为一组。然后,我将使用 In 查询获取属于这些树的所有分支。按树键排序。扫描分支列表,第一次找到树的密钥时,从列表中拉出密钥树。完成后,您将获得“无分支”树键的列表。安排其中每一项进行处理。
一个更简单的版本是在树上使用 MapReduce。对于每棵树,找到与其 ID 匹配的一个分支。如果不能,请标记该树以进行后续处理。默认情况下,此函数将拉取具有 8 个并发工作线程的批量树(我认为是 25 个)。而且,它在内部管理作业队列,因此您不必担心超时。
I have had great success with datetimes on GAE.
time_taken will be a timedelta. You can compare it against another timedelta that has the duration you are interested in.
It sounds like you would be far better served using mapreduce or Task Queues. Both are great fun for dealing with huge numbers of records.
A cleaner pattern for the code you have is to fetch only some records.
This code will only pull 100 records. If you have a full 100, when you are done, you can throw another item on the queue to launch off more.
-- Based on comment about needing trees without branches --
I do not see your model up there, but if I were trying to create a list of all of the trees without branches and process them, I would: Fetch the keys only for trees in blocks of 100 or so. Then, I would fetch all of the branches that belong to those trees using an In query. Order by the tree key. Scan the list of branches, the first time you find a tree's key, pull the key tree from the list. When done, you will have a list of "branchless" tree keys. Schedule each one of them for processing.
A simpler version is to use MapReduce on the trees. For each tree, find one branch that matches its ID. If you cannot, flag the tree for follow up. By default, this function will pull batches of trees (I think 25) with 8 simultaneous workers. And, it manages the job queues internally so you don't have to worry about timing out.
除了让代码在适当的时间范围内执行之外,没有其他方法可以“绕过”截止日期异常。
There is not a way "around" the deadline exception aside from making your code execute within the proper timeframe.
这里的问题是您正在对文档中的每个链接执行查询操作。由于维基百科页面可能包含大量链接,这意味着大量查询 - 因此,您会耗尽处理时间。这种方法也会以惊人的速度消耗您的配额!
相反,您应该使用维基百科页面的页面名称作为实体的键名称。然后,您可以将文档中的所有链接收集到一个列表中,从中构造键(这完全是本地操作),并对所有链接执行单个批处理 db.get。一旦您根据需要更新和/或创建了它们,您就可以执行批处理 db.put 将它们全部存储到数据存储中 - 将数据存储操作总数从 numlinks*2 减少到 2!
The problem here is that you're doing a query operation for every link in your document. Since wikipedia pages can contain a lot of links, this means a lot of queries - and hence, you run out of processing time. This approach is also going to consume your quota at a fantastic rate!
Instead, you should use the page name of the Wikipedia page as the key name of the entity. Then, you can collect up all the links from the document into a list, construct keys from them (which is an entirely local operation), and do a single batch db.get for all of them. Once you've updated and/or created them as appropriate, you can do a batch db.put to store them all to the datastore - reducing your total datastore operations from numlinks*2 to just 2!
当发生 DeadlineExcededErrors 时,您希望请求再次调用时最终成功。这可能要求你的抓取状态保证已经取得了一些下次可以跳过的进度。 (此处未讨论)
并行调用可以提供巨大帮助。
Urlfetch:
数据存储
将实体合并到单个往返调用中。
将 TreeNode.gql 从内部循环拉入并行查询工具(如 asynctools)
http://asynctools.googlecode.com
Asynctools 示例
披露:我与 asynctools 关联。
When DeadlineExcededErrors happen you want the request to eventually succeed if called again. This may require that your crawling state is guaranteed to have made some progress that can be skipped the next time. (Not addressed here)
Parallelized calls can help tremendously.
Urlfetch:
Datastore
Combine Entities being put into a single round trip call.
Pull TreeNode.gql from inside loop up into parallel query tool like asynctools
http://asynctools.googlecode.com
Asynctools Example
DISCLOSURE: I am associated with asynctools.