GAE:带有测试床的单元测试任务队列

发布于 2024-11-18 23:37:39 字数 125 浏览 2 评论 0原文

我正在使用测试床对我的谷歌应用程序引擎应用程序进行单元测试,并且我的应用程序使用任务队列。

当我在单元测试期间将任务提交到任务队列时,看起来该任务在队列中,但该任务并未执行。

如何在单元测试期间执行任务?

I'm using testbed to unit test my google app engine app, and my app uses a taskqueue.

When I submit a task to a taskqueue during a unit test, it appears that the task is in the queue, but the task does not execute.

How do I get the task to execute during a unit test?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

谜泪 2024-11-25 23:37:39

使用 Saxon 的出色答案,我可以使用 testbed 而不是 gaetestbed 来做同样的事情。这就是我所做的。

将其添加到我的 setUp() 中:

    self.taskqueue_stub = apiproxy_stub_map.apiproxy.GetStub('taskqueue')

然后,在我的测试中,我使用了以下内容:

    # Execute the task in the taskqueue
    tasks = self.taskqueue_stub.GetTasks("default")
    self.assertEqual(len(tasks), 1)
    task = tasks[0]
    params = base64.b64decode(task["body"])
    response = self.app.post(task["url"], params)

在此过程中,POST 参数进行了 Base64 编码,因此必须撤消该操作才能使其正常工作。

我比 Saxon 的答案更喜欢这个,因为我可以使用官方测试平台包,并且可以在我自己的测试代码中完成这一切。

编辑:我后来想对使用延迟库提交的任务做同样的事情,并且花了一些时间才弄清楚,所以我在这里分享是为了减轻其他人的痛苦。

如果您的任务队列仅包含使用延迟提交的任务,那么这将运行所有任务以及这些任务排队的任何任务:

def submit_deferred(taskq):
    tasks = taskq.GetTasks("default")
    taskq.FlushQueue("default")
    while tasks:
        for task in tasks:
            (func, args, opts) = pickle.loads(base64.b64decode(task["body"]))
            func(*args)
        tasks = taskq.GetTasks("default")
        taskq.FlushQueue("default")

Using Saxon's excellent answer, I was able to do the same thing using testbed instead of gaetestbed. Here is what I did.

Added this to my setUp():

    self.taskqueue_stub = apiproxy_stub_map.apiproxy.GetStub('taskqueue')

Then, in my test, I used the following:

    # Execute the task in the taskqueue
    tasks = self.taskqueue_stub.GetTasks("default")
    self.assertEqual(len(tasks), 1)
    task = tasks[0]
    params = base64.b64decode(task["body"])
    response = self.app.post(task["url"], params)

Somewhere along the line, the POST parameters get base64 encoded so had to undo that to get it to work.

I like this better than Saxon's answer since I can use the official testbed package and I can do it all within my own test code.

EDIT: I later wanted to do the same thing with tasks submitted using the deferred library, and it took a bit of headbanging to figure it, so I'm sharing here to ease other people's pain.

If your taskqueue contains only tasks submitted with deferred, then this will run all of the tasks and any tasks queued by those tasks:

def submit_deferred(taskq):
    tasks = taskq.GetTasks("default")
    taskq.FlushQueue("default")
    while tasks:
        for task in tasks:
            (func, args, opts) = pickle.loads(base64.b64decode(task["body"]))
            func(*args)
        tasks = taskq.GetTasks("default")
        taskq.FlushQueue("default")
守护在此方 2024-11-25 23:37:39

实现此目的的另一个(更干净的)选项是使用测试台内的任务队列存根。为此,您首先必须通过将以下内容添加到 setUp() 方法来初始化任务队列存根:

self.testbed = init_testbed()
self.testbed.init_taskqueue_stub()

可以使用以下代码访问任务调度程序:

taskq = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)

使用队列存根的接口是如下:

GetQueues() #returns a list of dictionaries with information about the available queues

#returns a list of dictionaries with information about the tasks in a given queue
GetTasks(queue_name)

DeleteTask(queue_name, task_name) #removes the task task_name from the given queue

FlushQueue(queue_name) #removes all the tasks from the queue

#returns tasks filtered by name & url pointed to by the task from the given queues
get_filtered_tasks(url, name, queue_names)

StartBackgroundExecution() #Executes the queued tasks

Shutdown() #Requests the task scheduler to shutdown.

此外,由于它使用 App Engine SDK 自己的设施 - 它与延迟库一起工作得很好。

Another (cleaner) option to achieve this is to use the task queue stub within the testbed. To do this you first have to initialize the task queue stub by adding the following to your setUp() method:

self.testbed = init_testbed()
self.testbed.init_taskqueue_stub()

The tasks scheduler can be accessed using the following code:

taskq = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)

The interface for working with the queue stub is as follows:

GetQueues() #returns a list of dictionaries with information about the available queues

#returns a list of dictionaries with information about the tasks in a given queue
GetTasks(queue_name)

DeleteTask(queue_name, task_name) #removes the task task_name from the given queue

FlushQueue(queue_name) #removes all the tasks from the queue

#returns tasks filtered by name & url pointed to by the task from the given queues
get_filtered_tasks(url, name, queue_names)

StartBackgroundExecution() #Executes the queued tasks

Shutdown() #Requests the task scheduler to shutdown.

Also, as this uses App Engine SDK own facilities - it works just fine with the deferred library.

何以笙箫默 2024-11-25 23:37:39

开发应用程序服务器是单线程的,因此当前台线程运行测试时,它无法在后台运行任务。

我修改了 gaetestbed 中的 taskqueue.py 中的 TaskQueueTestCase 以添加以下函数:

def execute_tasks(self, application):
    """
    Executes all currently queued tasks, and also removes them from the 
    queue.
    The tasks are execute against the provided web application.
    """

    # Set up the application for webtest to use (resetting _app in case a
    # different one has been used before). 
    self._app = None
    self.APPLICATION = application

    # Get all of the tasks, and then clear them.
    tasks = self.get_tasks()
    self.clear_task_queue()

    # Run each of the tasks, checking that they succeeded.
    for task in tasks:
        response = self.post(task['url'], task['params'])
        self.assertOK(response)

为了使此功能正常工作,我还必须将 TaskQueueTestCase 的基类从 BaseTestCase 更改为 WebTestCase。

然后,我的测试会执行如下操作:

# Do something which enqueues a task.

# Check that a task was enqueued, then execute it.
self.assertTrue(len(self.get_tasks()), 1)
self.execute_tasks(some_module.application)

# Now test that the task did what was expected.

因此,这会直接从前台单元测试执行任务。这与生产中不太一样(即,任务将在“一段时间后”根据单独的请求执行),但它对我来说已经足够好了。

The dev app server is single-threaded, so it can't run tasks in the background while the foreground thread is running the tests.

I modified TaskQueueTestCase in taskqueue.py in gaetestbed to add the following function:

def execute_tasks(self, application):
    """
    Executes all currently queued tasks, and also removes them from the 
    queue.
    The tasks are execute against the provided web application.
    """

    # Set up the application for webtest to use (resetting _app in case a
    # different one has been used before). 
    self._app = None
    self.APPLICATION = application

    # Get all of the tasks, and then clear them.
    tasks = self.get_tasks()
    self.clear_task_queue()

    # Run each of the tasks, checking that they succeeded.
    for task in tasks:
        response = self.post(task['url'], task['params'])
        self.assertOK(response)

For this to work, I also had to change the base class of TaskQueueTestCase from BaseTestCase to WebTestCase.

My tests then do something like this:

# Do something which enqueues a task.

# Check that a task was enqueued, then execute it.
self.assertTrue(len(self.get_tasks()), 1)
self.execute_tasks(some_module.application)

# Now test that the task did what was expected.

This therefore executes the task directly from the foreground unit test. This is not quite the same as in production (ie, the task will get executed 'some time later' on a separate request), but it works well enough for me.

温柔女人霸气范 2024-11-25 23:37:39

您可能想尝试以下代码。完整说明在这里: http://www.geewax .org/task-queue-support-in-app-engines-ext-testbed/

import unittest
from google.appengine.api import taskqueue
from google.appengine.ext import testbed


class TaskQueueTestCase(unittest.TestCase):

  def setUp(self):
    self.testbed = testbed.Testbed()
    self.testbed.activate()
    self.testbed.init_taskqueue_stub()
    self.task_queue_stub = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)

  def tearDown(self):
    self.testbed.deactivate()

  def testTaskAdded(self):
    taskqueue.add(url='/path/to/task')

    tasks = self.taskqueue_stub.get_filtered_tasks(url='/path/to/task')
    self.assertEqual(1, len(tasks))
    self.assertEqual('/path/to/task', tasks[0].url)

unittest.main()

You might want to try the following code. Full explanation is here: http://www.geewax.org/task-queue-support-in-app-engines-ext-testbed/

import unittest
from google.appengine.api import taskqueue
from google.appengine.ext import testbed


class TaskQueueTestCase(unittest.TestCase):

  def setUp(self):
    self.testbed = testbed.Testbed()
    self.testbed.activate()
    self.testbed.init_taskqueue_stub()
    self.task_queue_stub = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)

  def tearDown(self):
    self.testbed.deactivate()

  def testTaskAdded(self):
    taskqueue.add(url='/path/to/task')

    tasks = self.taskqueue_stub.get_filtered_tasks(url='/path/to/task')
    self.assertEqual(1, len(tasks))
    self.assertEqual('/path/to/task', tasks[0].url)

unittest.main()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文