Google App Engine:如何使用 TaskQueue 或 Async Urlfetch 并行下载?

发布于 2024-09-15 06:32:35 字数 1288 浏览 7 评论 0原文

我的 Gae 应用程序从第三方站点检索 JSON 数据;给定一个代表要下载的项目的 ID,该网站上的项目数据被组织在多个页面中,因此我的代码必须逐页下载数据块,直到检索到最后一个可用页面的数据。
我的简化代码如下所示:

class FetchData(webapp.RequestHandler):
  def get(self):
    ...
    data_list = []
    page = 1
    while True:
      fetched_data= urlfetch.fetch('http://www.foo.com/getdata?id=xxx&result=JSON&page=%s' % page)
      data_chunk = fetched_data["data"] 
      data_list = data_list + data_chunk
      if len(data_list) == int(fetched_data["total_pages"]):
         break
      else:
         page = page +1 
    ...  
    doRender('dataview.htm',{'data_list':data_list} )

data_list 结果是一个有序列表,其中第一项包含第 1 页的数据,最后一项包含最新页的数据;这个data_list一旦被检索,就会在视图中呈现。

这种方法在 99% 的情况下都有效,但有时,由于 Google App Engine 施加的 30 秒限制,在包含许多页面的项目上,我会收到可怕的 DeadlineExceededError 错误。 我想知道是否使用 TaskQueue |延期|AsyncUrlfetch 我可以改进此算法,以某种方式并行化 N 个 urlfetch 调用。

My Gae application retrieves JSON data from a third party site; given an ID representing the item to download , the item's data on this site is organized in multiple pages so my code has to download chunks of data, page after page, until the data of the last available page is retrieved.
My simplified code looks like this:

class FetchData(webapp.RequestHandler):
  def get(self):
    ...
    data_list = []
    page = 1
    while True:
      fetched_data= urlfetch.fetch('http://www.foo.com/getdata?id=xxx&result=JSON&page=%s' % page)
      data_chunk = fetched_data["data"] 
      data_list = data_list + data_chunk
      if len(data_list) == int(fetched_data["total_pages"]):
         break
      else:
         page = page +1 
    ...  
    doRender('dataview.htm',{'data_list':data_list} )

The data_list results is an ordered list where the first item has data of page number 1 and the last item has data of the latest page; this data_list, once retrieved, is rendered in a view.

This approach works 99% of times but sometimes, due to the 30 seconds limit imposed by Google App Engine, on items with many pages i get the dreaded DeadlineExceededError.
I would like to know if using TaskQueue|Deferred|AsyncUrlfetch I could improve this algorithm parallelizing in some way the N urlfetch calls.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

世态炎凉 2024-09-22 06:32:35

使用此:http://code.google.com/appengine/docs /python/urlfetch/asynchronousrequests.html

这很简单,如下所示:

def handle_result(rpc):
    result = rpc.get_result()
    # ... Do something with result...

# Use a helper function to define the scope of the callback.
def create_callback(rpc):
    return lambda: handle_result(rpc)

rpcs = []
for url in urls:
    rpc = urlfetch.create_rpc()
    rpc.callback = create_callback(rpc)
    urlfetch.make_fetch_call(rpc, url)
    rpcs.append(rpc)

# ...

# Finish all RPCs, and let callbacks process the results.
for rpc in rpcs:
    rpc.wait()

Use this: http://code.google.com/appengine/docs/python/urlfetch/asynchronousrequests.html

Which is simple like so:

def handle_result(rpc):
    result = rpc.get_result()
    # ... Do something with result...

# Use a helper function to define the scope of the callback.
def create_callback(rpc):
    return lambda: handle_result(rpc)

rpcs = []
for url in urls:
    rpc = urlfetch.create_rpc()
    rpc.callback = create_callback(rpc)
    urlfetch.make_fetch_call(rpc, url)
    rpcs.append(rpc)

# ...

# Finish all RPCs, and let callbacks process the results.
for rpc in rpcs:
    rpc.wait()
征﹌骨岁月お 2024-09-22 06:32:35

我已经解决了这个问题:

chunks_dict = {}

def handle_result(rpc, page):
    result = rpc.get_result()
    chunks_dict[page] = result["data"]

def create_callback(rpc, page):
    return lambda: handle_result(rpc, page)

rpcs = []
while True:
    rpc = urlfetch.create_rpc(deadline = 10)
    rpc.callback = create_callback(rpc, page)
    urlfetch.make_fetch_call(rpc, 'http://www.foo.com/getdata?id=xxx&result=JSON&page=%s' % page)
    rpcs.append(rpc)
    if page > total_pages:
       break
    else:
       page = page +1   
for rpc in rpcs:
    rpc.wait()

page_keys = chunks_dict.keys()
page_keys.sort()
for key in page_keys:
    data_list= data_list + chunks_dict[key]

I have resolved with this:

chunks_dict = {}

def handle_result(rpc, page):
    result = rpc.get_result()
    chunks_dict[page] = result["data"]

def create_callback(rpc, page):
    return lambda: handle_result(rpc, page)

rpcs = []
while True:
    rpc = urlfetch.create_rpc(deadline = 10)
    rpc.callback = create_callback(rpc, page)
    urlfetch.make_fetch_call(rpc, 'http://www.foo.com/getdata?id=xxx&result=JSON&page=%s' % page)
    rpcs.append(rpc)
    if page > total_pages:
       break
    else:
       page = page +1   
for rpc in rpcs:
    rpc.wait()

page_keys = chunks_dict.keys()
page_keys.sort()
for key in page_keys:
    data_list= data_list + chunks_dict[key]
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文