应用程序引擎上的带有游标问题的任务链。异常:查询偏移量太大。还有其他人遇到这个问题吗?

发布于 2024-09-14 06:34:10 字数 1935 浏览 1 评论 0原文

我不确定其他人是否有这个问题,但在 appengine 开发服务器上使用光标链接任务时,我遇到了异常“查询偏移量太大”(不确定是否发生在居住)。

在单个查询中处理了 4000 多条记录后请求游标时,会发生错误。

我不知道偏移量与光标有什么关系,也许它只是应用程序引擎 SDK 中的一个怪癖。

要解决此问题,请缩短任务推迟之前允许的时间(因此一次处理的记录数较少),或者在检查已过去的时间时,您还可以检查处理的记录数是否仍在范围内。例如,如果 time.time() > end_time 或 count == 2000。重置计数并推迟任务。 2000是一个任意数字,我不确定限制应该是多少。

编辑:

进行上述更改后,永远不会完成执行。正在调用 with_cursor(cursor) 代码,但似乎每次都从头开始。我错过了一些明显的东西吗?

导致异常的代码如下:

表“Transact”有 4800 行。当 time.time() > 时调用 transacts.cursor() 时会出现错误end_time 为真。请求游标时已处理了 4510 条记录,这似乎导致了错误(在开发服务器上,在其他地方没有测试过)。

def some_task(trans):
  tts = db.get(trans)
  for t in tts:
    #logging.info('in some_task')
    pass


def test_cursor(request):
  ret = test_cursor_task()


def test_cursor_task(cursor = None):
  startDate = datetime.datetime(2010,7,30)
  endDate = datetime.datetime(2010,8,30)
  end_time = time.time() + 20.0   
  transacts = Transact.all().filter('transactionDate >', startDate).filter('transactionDate <=',endDate)

  count =0
  if cursor:
      transacts.with_cursor(cursor)
  trans =[]
  logging.info('queue_trans')
  for tran in transacts:
    count+=1
    #trans.append(str(tran))   
    trans.append(str(tran.key()))   

    if len(trans)==20:
            deferred.defer(some_task, trans,  _countdown = 500)                
            trans =[]            
    if time.time() > end_time:
        logging.info(count)            
        if len(trans)>0:                
           deferred.defer(some_task, trans, _countdown = 500)
           trans =[]
        logging.info('time limit exceeded setting next call to queue')
        cursor = transacts.cursor()
        deferred.defer(test_cursor_task, cursor)
        logging.info('returning false')
        return False


    return True


  return HttpResponse('')

希望这对某人有帮助。

谢谢 伯特

I'm not sure if anyone else has this problem, but I'm getting an exception "Too big query offset" when using a cursor for chaining tasks on appengine development server (not sure if it happens on live).

The error occurs when requesting a cursor after 4000+ records have been processed in a single query.

I wasn't aware that offsets had anything to do with cursors, and perhaps its just a quirk in sdk for app engine.

To fix, either shorten the time allowed before task is deferred (so fewer records get processed at a time) or when checking time elapsed you can also check the number of records processed is still within range. e.g, if time.time() > end_time or count == 2000.Reset count and defer task. 2000 is an arbitrary number, I'm not sure what the limit should be.

EDIT:

After making the above mentioned changes, the never finishes executing. The with_cursor(cursor) code is being called, but seems to start at the beginning each time. Am I missing something obvious?

The code that causes the exception is as follows:

The table "Transact" has 4800 rows. The error occurs when transacts.cursor() is called when time.time() > end_time is true. 4510 records have been processed at the time when the cursor is requested, which seems to cause the error (on development server, haven't tested elsewhere).

def some_task(trans):
  tts = db.get(trans)
  for t in tts:
    #logging.info('in some_task')
    pass


def test_cursor(request):
  ret = test_cursor_task()


def test_cursor_task(cursor = None):
  startDate = datetime.datetime(2010,7,30)
  endDate = datetime.datetime(2010,8,30)
  end_time = time.time() + 20.0   
  transacts = Transact.all().filter('transactionDate >', startDate).filter('transactionDate <=',endDate)

  count =0
  if cursor:
      transacts.with_cursor(cursor)
  trans =[]
  logging.info('queue_trans')
  for tran in transacts:
    count+=1
    #trans.append(str(tran))   
    trans.append(str(tran.key()))   

    if len(trans)==20:
            deferred.defer(some_task, trans,  _countdown = 500)                
            trans =[]            
    if time.time() > end_time:
        logging.info(count)            
        if len(trans)>0:                
           deferred.defer(some_task, trans, _countdown = 500)
           trans =[]
        logging.info('time limit exceeded setting next call to queue')
        cursor = transacts.cursor()
        deferred.defer(test_cursor_task, cursor)
        logging.info('returning false')
        return False


    return True


  return HttpResponse('')

Hope this helps someone.

Thanks
Bert

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

摇划花蜜的午后 2024-09-21 06:34:10

在不使用 iter 功能的情况下再试一次:

#...
CHUNK = 500
objs = transacts.fetch(CHUNK)

for tran in objs:
    do_your_stuff

if len(objs) == CHUNK:
    deferred.defer(my_task_again, cursor=str(transacts.cursor()))

这对我有用。

Try this again without using the iter functionality:

#...
CHUNK = 500
objs = transacts.fetch(CHUNK)

for tran in objs:
    do_your_stuff

if len(objs) == CHUNK:
    deferred.defer(my_task_again, cursor=str(transacts.cursor()))

This works for me.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文