cron 作业的应用程序引擎 DeadlineExceededError 和维基百科爬虫的任务队列

发布于 2024-09-27 04:28:12 字数 3174 浏览 1 评论 0原文

我正在尝试在谷歌应用程序引擎上构建一个维基百科链接爬虫。我想在数据存储中存储索引。但我遇到了 cron 作业和任务队列的 DeadlineExceededError 。

对于 cron 作业,我有以下代码:

def buildTree(self):

    start=time.time()
    self.log.info(" Start Time: %f" % start)
    nobranches=TreeNode.all()       

    for tree in nobranches:            
        if tree.branches==[]:
            self.addBranches(tree)
            time.sleep(1)
        if (time.time()-start) > 10 :                
            break
        self.log.info("Time Eclipsed: %f" % (time.time()-start))

    self.log.info(" End Time:%f" % time.clock())

我不明白为什么 for 循环在 10 秒后没有中断。它在开发服务器上执行。服务器上的 time.time() 肯定有问题。还有其他功能我可以使用吗?

对于任务队列,我有以下代码:
<代码> def addNewBranch(self, keywords, level=0):

    self.log.debug("Add Tree")        
    self.addBranches(keyword)

    t=TreeNode.gql("WHERE name=:1", keyword).get()
    branches=t.nodes

    if level < 3:
        for branch in branches:
            if branch.branches == []:
                taskqueue.add(url="/addTree/%s" % branch.name)
                self.log.debug("url:%s" % "/addTree/%s" % branch.name)

日志显示它们都遇到了 DeadlineExceededError。后台处理时间不应超过页面请求的 30 秒。有没有办法解决异常?

这是 addBranch() 的代码

def addBranches(self, 关键字):

    tree=TreeNode.gql("WHERE name=:1", keyword).get()
    if tree is None:
        tree=TreeNode(name=keyword)


    self.log.debug("in addBranches arguments: tree %s", tree.name)     
    t=urllib2.quote(tree.name.encode('utf8'))
    s="http://en.wikipedia.org/w/api.php?action=query&titles=%s&prop=links&pllimit=500&format=xml" % t
    self.log.debug(s)
    try:        
        usock = urllib2.urlopen(s)       
    except :        

        self.log.error( "Could not retrieve doc: %s" % tree.name)
        usock=None

    if usock is not None:

        try:
            xmldoc=minidom.parse(usock)
        except Exception , error:
            self.log.error("Parse Error: %s" % error) 
            return None   
        usock.close()            
        try:
            pyNode= xmldoc.getElementsByTagName('pl')  
            self.log.debug("Nodes to be added: %d" % pyNode.length)
        except Exception, e:
            pyNode=None
            self.log.error("Getting Nodes Error: %s" % e)
            return None
        newNodes=[]    
        if pyNode is not None:
            for child in pyNode: 
                node=None             
                node= TreeNode.gql("WHERE name=:1", child.attributes["title"].value).get()

                if node is None:
                    newNodes.append(TreeNode(name=child.attributes["title"].value))           

                else:
                    tree.branches.append(node.key())  
            db.put(newNodes)
            for node in newNodes:
                tree.branches.append(node.key())
                self.log.debug("Node Added: %s" % node.name)                    
            tree.put()
            return tree.branches 

I am trying to build a wikipedia link crawler on google app engine. I wanted to store an index in the datastore. But I run into the DeadlineExceededError for both cron jobs and task queue.

for the cron job I have this code:


def buildTree(self):

    start=time.time()
    self.log.info(" Start Time: %f" % start)
    nobranches=TreeNode.all()       

    for tree in nobranches:            
        if tree.branches==[]:
            self.addBranches(tree)
            time.sleep(1)
        if (time.time()-start) > 10 :                
            break
        self.log.info("Time Eclipsed: %f" % (time.time()-start))

    self.log.info(" End Time:%f" % time.clock())

I don't understand why the for loop doesn't break after 10 seconds. It does on the dev server. Something must be wrong with the time.time() on the server. Is there another function I can use?

for the task queue I have this code:

def addNewBranch(self, keyword, level=0):

    self.log.debug("Add Tree")        
    self.addBranches(keyword)

    t=TreeNode.gql("WHERE name=:1", keyword).get()
    branches=t.nodes

    if level < 3:
        for branch in branches:
            if branch.branches == []:
                taskqueue.add(url="/addTree/%s" % branch.name)
                self.log.debug("url:%s" % "/addTree/%s" % branch.name)

The logs show that they both run into the DeadlineExceededError. Shouldn't background processing have a longer that the 30 seconds for the page request. Is there a way around the exception?

Here is the code for addBranch()

def addBranches(self, keyword):

    tree=TreeNode.gql("WHERE name=:1", keyword).get()
    if tree is None:
        tree=TreeNode(name=keyword)


    self.log.debug("in addBranches arguments: tree %s", tree.name)     
    t=urllib2.quote(tree.name.encode('utf8'))
    s="http://en.wikipedia.org/w/api.php?action=query&titles=%s&prop=links&pllimit=500&format=xml" % t
    self.log.debug(s)
    try:        
        usock = urllib2.urlopen(s)       
    except :        

        self.log.error( "Could not retrieve doc: %s" % tree.name)
        usock=None

    if usock is not None:

        try:
            xmldoc=minidom.parse(usock)
        except Exception , error:
            self.log.error("Parse Error: %s" % error) 
            return None   
        usock.close()            
        try:
            pyNode= xmldoc.getElementsByTagName('pl')  
            self.log.debug("Nodes to be added: %d" % pyNode.length)
        except Exception, e:
            pyNode=None
            self.log.error("Getting Nodes Error: %s" % e)
            return None
        newNodes=[]    
        if pyNode is not None:
            for child in pyNode: 
                node=None             
                node= TreeNode.gql("WHERE name=:1", child.attributes["title"].value).get()

                if node is None:
                    newNodes.append(TreeNode(name=child.attributes["title"].value))           

                else:
                    tree.branches.append(node.key())  
            db.put(newNodes)
            for node in newNodes:
                tree.branches.append(node.key())
                self.log.debug("Node Added: %s" % node.name)                    
            tree.put()
            return tree.branches 

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

猫卆 2024-10-04 04:28:12

我在 GAE 上的日期时间方面取得了巨大成功。

from datetime import datetime, timedelta
time_start = datetime.now()
time_taken = datetime.now() - time_start

time_taken 将是一个时间增量。您可以将其与具有您感兴趣的持续时间的另一个时间增量进行比较。

ten_seconds = timedelta(seconds=10)
if time_taken > ten_seconds:
    ....do something quick.

听起来使用 MapReduce 或任务队列会为您提供更好的服务。两者对于处理大量记录都很有趣。

对于您拥有的代码来说,更简洁的模式是仅获取一些记录。

nobranches=TreeNode.all().fetch(100)

此代码只会提取 100 条记录。如果您有满 100 个项目,完成后,您可以在队列中添加另一个项目以启动更多项目。

-- 基于关于需要没有分支的树的评论 --

我在那里看不到你的模型,但是如果我试图创建所有没有分支的树的列表并处理它们,我会:仅获取树的键以 100 个左右为一组。然后,我将使用 In 查询获取属于这些树的所有分支。按树键排序。扫描分支列表,第一次找到树的密钥时,从列表中拉出密钥树。完成后,您将获得“无分支”树键的列表。安排其中每一项进行处理。

一个更简单的版本是在树上使用 MapReduce。对于每棵树,找到与其 ID 匹配的一个分支。如果不能,请标记该树以进行后续处理。默认情况下,此函数将拉取具有 8 个并发工作线程的批量树(我认为是 25 个)。而且,它在内部管理作业队列,因此您不必担心超时。

I have had great success with datetimes on GAE.

from datetime import datetime, timedelta
time_start = datetime.now()
time_taken = datetime.now() - time_start

time_taken will be a timedelta. You can compare it against another timedelta that has the duration you are interested in.

ten_seconds = timedelta(seconds=10)
if time_taken > ten_seconds:
    ....do something quick.

It sounds like you would be far better served using mapreduce or Task Queues. Both are great fun for dealing with huge numbers of records.

A cleaner pattern for the code you have is to fetch only some records.

nobranches=TreeNode.all().fetch(100)

This code will only pull 100 records. If you have a full 100, when you are done, you can throw another item on the queue to launch off more.

-- Based on comment about needing trees without branches --

I do not see your model up there, but if I were trying to create a list of all of the trees without branches and process them, I would: Fetch the keys only for trees in blocks of 100 or so. Then, I would fetch all of the branches that belong to those trees using an In query. Order by the tree key. Scan the list of branches, the first time you find a tree's key, pull the key tree from the list. When done, you will have a list of "branchless" tree keys. Schedule each one of them for processing.

A simpler version is to use MapReduce on the trees. For each tree, find one branch that matches its ID. If you cannot, flag the tree for follow up. By default, this function will pull batches of trees (I think 25) with 8 simultaneous workers. And, it manages the job queues internally so you don't have to worry about timing out.

旧伤慢歌 2024-10-04 04:28:12

除了让代码在适当的时间范围内执行之外,没有其他方法可以“绕过”截止日期异常。

There is not a way "around" the deadline exception aside from making your code execute within the proper timeframe.

挽手叙旧 2024-10-04 04:28:12

这里的问题是您正在对文档中的每个链接执行查询操作。由于维基百科页面可能包含大量链接,这意味着大量查询 - 因此,您会耗尽处理时间。这种方法也会以惊人的速度消耗您的配额!

相反,您应该使用维基百科页面的页面名称作为实体的键名称。然后,您可以将文档中的所有链接收集到一个列表中,从中构造键(这完全是本地操作),并对所有链接执行单个批处理 db.get。一旦您根据需要更新和/或创建了它们,您就可以执行批处理 db.put 将它们全部存储到数据存储中 - 将数据存储操作总数从 numlinks*2 减少到 2!

The problem here is that you're doing a query operation for every link in your document. Since wikipedia pages can contain a lot of links, this means a lot of queries - and hence, you run out of processing time. This approach is also going to consume your quota at a fantastic rate!

Instead, you should use the page name of the Wikipedia page as the key name of the entity. Then, you can collect up all the links from the document into a list, construct keys from them (which is an entirely local operation), and do a single batch db.get for all of them. Once you've updated and/or created them as appropriate, you can do a batch db.put to store them all to the datastore - reducing your total datastore operations from numlinks*2 to just 2!

睫毛溺水了 2024-10-04 04:28:12

当发生 DeadlineExcededErrors 时,您希望请求再次调用时最终成功。这可能要求你的抓取状态保证已经取得了一些下次可以跳过的进度。 (此处未讨论)

并行调用可以提供巨大帮助。

  • Urlfetch
  • 数据存储放置(使用 db.put 将实体混合在一起)
  • 数据存储查询(并行查询 - 异步工具)

Urlfetch:

  • 当您进行 urlfetch 调用时,请务必使用异步模式来折叠循环。

数据存储

  • 将实体合并到单个往返调用中。

    # 同时放入newNodes+tree
    db.put(新节点+树)
    
  • 将 TreeNode.gql 从内部循环拉入并行查询工具(如 asynctools)
    http://asynctools.googlecode.com

Asynctools 示例

    if pyNode is not None:

        runner = AsyncMultiTask()
        for child in pyNode:
             title = child.attributes["title"].value
             query = db.GqlQuery("SELECT __key__ FROM TreeNode WHERE name = :1", title)
             runner.append(QueryTask(query, limit=1, client_state=title))

        # kick off the work
        runner.run()

        # peel out the results
        treeNodes = []
        for task in runner:
            task_result = task.get_result() # will raise any exception that occurred for the given query
            treeNodes.append(task_result)

        for node in treeNodes:
            if node is None:
                newNodes.append(TreeNode(name=child.attributes["title"].value))

            else:
                tree.branches.append(node.key())
        for node in newNodes:
            tree.branches.append(node.key())
            self.log.debug("Node Added: %s" % node.name)

        # put newNodes+tree at the same time
        db.put(newNodes+tree)
        return tree.branches

披露:我与 asynctools 关联。

When DeadlineExcededErrors happen you want the request to eventually succeed if called again. This may require that your crawling state is guaranteed to have made some progress that can be skipped the next time. (Not addressed here)

Parallelized calls can help tremendously.

  • Urlfetch
  • Datastore Put (mixed entities together using db.put)
  • Datastore Query (queries in parallel - asynctools)

Urlfetch:

  • When you make your urlfetch calls be sure to use the asynchronous mode to collapse your loop.

Datastore

  • Combine Entities being put into a single round trip call.

    # put newNodes+tree at the same time
    db.put(newNodes+tree)
    
  • Pull TreeNode.gql from inside loop up into parallel query tool like asynctools
    http://asynctools.googlecode.com

Asynctools Example

    if pyNode is not None:

        runner = AsyncMultiTask()
        for child in pyNode:
             title = child.attributes["title"].value
             query = db.GqlQuery("SELECT __key__ FROM TreeNode WHERE name = :1", title)
             runner.append(QueryTask(query, limit=1, client_state=title))

        # kick off the work
        runner.run()

        # peel out the results
        treeNodes = []
        for task in runner:
            task_result = task.get_result() # will raise any exception that occurred for the given query
            treeNodes.append(task_result)

        for node in treeNodes:
            if node is None:
                newNodes.append(TreeNode(name=child.attributes["title"].value))

            else:
                tree.branches.append(node.key())
        for node in newNodes:
            tree.branches.append(node.key())
            self.log.debug("Node Added: %s" % node.name)

        # put newNodes+tree at the same time
        db.put(newNodes+tree)
        return tree.branches

DISCLOSURE: I am associated with asynctools.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文