如何使用 Google App Engine(Java)任务队列在一个实体中保留大量数据/行

发布于 2024-10-17 15:59:44 字数 2858 浏览 1 评论 0原文

我试图在单个实体中保留大约 28,000 个“行”,例如 EMPLOYEE

基本上,我的目标是避免使用超过 30 秒的 PUT 来终止/超时 - 如果我只通过调用执行 28,000 个 PUT,则可能会发生这种情况发送到 servlet 的 doPost() 请求。

所以我正在考虑使用 Google App Engine 文档中描述的任务。

本质上,我想在 war 目录中上传一个包含 28,000 个“员工”的 csv 文件。然后创建一个任务,将这 28,000 个员工行异步 PUT 到 EMPLOYEE 实体。

  • 问题1:这是一个可行的解决方案还是有更好的方法?同样,目标是执行 PUT 以避免由于 30 秒限制而被终止。

  • Q2:我还应该使用哪些queue.xml 配置来确保我可以尽快执行这些PUT?

  • Q3:现在,我已经尝试过,类似于博客条目:http://gaejexperiments.wordpress.com/2009/11/24/episode-10-using-the-task-queue-service/ 但我得到了大约 23 秒后出现以下错误:

    严重:作业 default.task1 抛出未处理的异常: 
    com.google.apphosting.api.ApiProxy$ApplicationException:ApplicationError:5:针对 URL http://127.0.0.1:8888/dotaskservlet 的 http 方法 POST 超时。
        在 com.google.appengine.api.urlfetch.dev.LocalURLFetchService.fetch(LocalURLFetchService.java:236)
        在 com.google.appengine.api.taskqueue.dev.LocalTask​​Queue$UrlFetchServiceLocalTask​​QueueCallback.execute(LocalTask​​Queue.java:471)
        在 com.google.appengine.api.taskqueue.dev.UrlFetchJob.execute(UrlFetchJob.java:77)
        在 org.quartz.core.JobRunShell.run(JobRunShell.java:203)
        在 org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:520)
    16/02/2011 12:12:55 PM org.quartz.core.ErrorLogger 调度程序错误
    严重:作业(default.task1 引发异常。
    org.quartz.SchedulerException:作业抛出了未处理的异常。 [请参阅嵌套异常:com.google.apphosting.api.ApiProxy$ApplicationException:ApplicationError:5:针对 URL http://127.0.0.1:8888/dotaskservlet 的 http 方法 POST 超时。]
        在 org.quartz.core.JobRunShell.run(JobRunShell.java:214)
        在 org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:520)
    * 嵌套异常(根本原因)----------------
    com.google.apphosting.api.ApiProxy$ApplicationException:ApplicationError:5:针对 URL http://127.0.0.1:8888/dotaskservlet 的 http 方法 POST 超时。
        在 com.google.appengine.api.urlfetch.dev.LocalURLFetchService.fetch(LocalURLFetchService.java:236)
        在 com.google.appengine.api.taskqueue.dev.LocalTask​​Queue$UrlFetchServiceLocalTask​​QueueCallback.execute(LocalTask​​Queue.java:471)
        在 com.google.appengine.api.taskqueue.dev.UrlFetchJob.execute(UrlFetchJob.java:77)
        在 org.quartz.core.JobRunShell.run(JobRunShell.java:203)
        在 org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:520)
    
  • Q4:我还检查了数据存储查看器 http://localhost:8888/_ah/admin 似乎只在该实体中创建了 1000 个结果。 1000 是限制吗?

  • Q5:如何消除上述错误?

  • Q6: 可以确认任务的最长允许时间是 10 分钟吗?还是还是30秒?我确实遇到过这个:http://code.google.com /appengine/docs/java/taskqueue/overview.html#Task_Execution

I'm trying to persist roughly 28,000 "rows" in a single entity e.g. EMPLOYEE

Basically, my goal is to avoid being terminated / timing out by using PUTs that exceed 30 seconds - which is what might happen if I just do 28,000 PUTs by invoking a doPost() request sent to a servlet.

So I'm thinking of using tasks described in the Google App Engine documentation.

Essentially, I would like to upload a csv file in the war directory with 28,000 "employees". Then create a task that will async PUT these 28,000 employee rows to the EMPLOYEE entity.

  • Q1: Is this a viable solution or is there a better way? Again, the goal is to performthe PUTs to avoid being terminated due to the 30 second limit.

  • Q2: Also what queue.xml configurations should I use to ensure I can perform these PUTs as fast as possible?

  • Q3: Now, I've tried it, similar to blog entry: http://gaejexperiments.wordpress.com/2009/11/24/episode-10-using-the-task-queue-service/ but I'm getting the following error after 23 or so seconds:

    SEVERE: Job default.task1 threw an unhandled Exception: 
    com.google.apphosting.api.ApiProxy$ApplicationException: ApplicationError: 5: http method POST against URL http://127.0.0.1:8888/dotaskservlet timed out.
        at com.google.appengine.api.urlfetch.dev.LocalURLFetchService.fetch(LocalURLFetchService.java:236)
        at com.google.appengine.api.taskqueue.dev.LocalTaskQueue$UrlFetchServiceLocalTaskQueueCallback.execute(LocalTaskQueue.java:471)
        at com.google.appengine.api.taskqueue.dev.UrlFetchJob.execute(UrlFetchJob.java:77)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:203)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:520)
    16/02/2011 12:12:55 PM org.quartz.core.ErrorLogger schedulerError
    SEVERE: Job (default.task1 threw an exception.
    org.quartz.SchedulerException: Job threw an unhandled exception. [See nested exception: com.google.apphosting.api.ApiProxy$ApplicationException: ApplicationError: 5: http method POST against URL http://127.0.0.1:8888/dotaskservlet timed out.]
        at org.quartz.core.JobRunShell.run(JobRunShell.java:214)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:520)
    * Nested Exception (Underlying Cause) ---------------
    com.google.apphosting.api.ApiProxy$ApplicationException: ApplicationError: 5: http method POST against URL http://127.0.0.1:8888/dotaskservlet timed out.
        at com.google.appengine.api.urlfetch.dev.LocalURLFetchService.fetch(LocalURLFetchService.java:236)
        at com.google.appengine.api.taskqueue.dev.LocalTaskQueue$UrlFetchServiceLocalTaskQueueCallback.execute(LocalTaskQueue.java:471)
        at com.google.appengine.api.taskqueue.dev.UrlFetchJob.execute(UrlFetchJob.java:77)
        at org.quartz.core.JobRunShell.run(JobRunShell.java:203)
        at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:520)
    
  • Q4: I've also checked the Datastore Viewer at http://localhost:8888/_ah/admin and it seems to have only created 1000 results in that entity. Is 1000 the limit?

  • Q5: How do I get rid of that above error?

  • Q6: Can any confirm that the maximum allowed time is 10minutes for a task? or is it still 30seconds? I did come accross this: http://code.google.com/appengine/docs/java/taskqueue/overview.html#Task_Execution

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

泛滥成性 2024-10-24 15:59:45

这是使用 mapreduce 解析 CSV 文件的示例/教程,似乎与您的需求类似:

http://ikaisays.com/2010/08/11/using-the-app-engine-mapper-for-bulk-data-import/

Here is an example/tutorial of using mapreduce to parse a CSV-file, seems to be similar to your needs:

http://ikaisays.com/2010/08/11/using-the-app-engine-mapper-for-bulk-data-import/

且行且努力 2024-10-24 15:59:45

如果您的目标只是自己上传一堆数据,而不是允许您的用户这样做,我认为更简单的工具是批量上传器。您只需从本地计算机运行一个 python 程序即可为您处理请求限制和故障恢复。

http://ikaisays.com/2010/ 06/10/使用-bulkloader-with-java-app-engine/

If your goal is only to upload a bunch of data yourself, and not to allow your users to do so, I think an easier tool would be the bulk uploader. You can just run a python program from your local machine that takes care of request limits and failure recovery for you.

http://ikaisays.com/2010/06/10/using-the-bulkloader-with-java-app-engine/

夏九 2024-10-24 15:59:45

我会通过 DeferredTask 进行批量保存来完成此操作,大致如下

List<Employee> employees=...
EmployeeWriter qr = new EmployeeWriter (employees);
TaskHandle task = QueueFactory.getDefaultQueue().add(withPayload(qr));

public class EmployeeWriter implements DeferredTask {
   public EmployeeWriter () {    }
   public EmployeeWriter (List<Employee> employees) { 
          this.employees=new LinkedList(employees);  
   }

    private LinkedList<Employee> employees;

    @Override
    public void run() {
      Stopwatch sw = Stopwatch.createStarted();
      try {
        do {
           List employeesTosave=Pull100EmployeesFromLinkedList(employees)
           ofy().save(employeesTosave).now();

        } while (sw.elapsed(TimeUnit.MINUTES) < 9);
     finally {
        sw.stop();
        if (!employees.isEmpty()) {
            QueueFactory.getDefaultQueue().add(withPayload(this));
        }
    }

}

I would do this with batched save via DeferredTask, roughly something like this:

List<Employee> employees=...
EmployeeWriter qr = new EmployeeWriter (employees);
TaskHandle task = QueueFactory.getDefaultQueue().add(withPayload(qr));

where

public class EmployeeWriter implements DeferredTask {
   public EmployeeWriter () {    }
   public EmployeeWriter (List<Employee> employees) { 
          this.employees=new LinkedList(employees);  
   }

    private LinkedList<Employee> employees;

    @Override
    public void run() {
      Stopwatch sw = Stopwatch.createStarted();
      try {
        do {
           List employeesTosave=Pull100EmployeesFromLinkedList(employees)
           ofy().save(employeesTosave).now();

        } while (sw.elapsed(TimeUnit.MINUTES) < 9);
     finally {
        sw.stop();
        if (!employees.isEmpty()) {
            QueueFactory.getDefaultQueue().add(withPayload(this));
        }
    }

}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文