JavaEE:在线程中构建和刷新响应

发布于 2024-10-21 16:22:49 字数 3389 浏览 1 评论 0原文

我正在尝试编写一个线程化的 Java EE servlet。
每个线程将进行一些处理并将其结果写入单个 HTTP 块中(使用 HTTP 传输编码:分块)。
每个块一旦可用就会被发送到客户端。
我在每个线程中写入和刷新输出时遇到问题。
举例来说,下面的代码生成 5 个线程,这些线程生成随机字符串,将其写入输出缓冲区并通过同步函数调用刷新该缓冲区。

public class TestServer extends HttpServlet {
   private PrintWriter m_out;

   public TestServer() {
      super();
   }

   protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
      m_out = response.getWriter();

      m_out.print("gonna start\n");
      m_out.flush();

      ThreadPoolExecutor thread_pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
      for(int i = 0; i < 5; i++ ) {
         thread_pool.execute(new Runnable() {
        private ReentrantLock m_lock = new ReentrantLock();
        @Override
        public void run() {
               SecureRandom random = new SecureRandom();
               m_lock.lock();
               try {
                  display(new BigInteger(1300, random).toString(32)+"\n");
               }
               finally {
                  m_lock.unlock();
               }
            }
         });
      }
      thread_pool.shutdown();
      try {
         thread_pool.awaitTermination(120, TimeUnit.SECONDS);
      }
      catch (InterruptedException e1) {
         log("thread interrupted");
         e1.printStackTrace();
      }
      m_out.print("ok we done\n");
      m_out.close();
   }

   void display(String content) {
      m_out.print(content);
      m_out.flush();
   }
}

有时,此代码将生成以下输出:

Server: Apache-Coyote/1.1
Transfer-Encoding: chunked
Date: Mon, 14 Mar 2011 22:26:15 GMT
Connection: close

gonna start
fmclslvi1p8pmpbnfg8ikfpp3uce5db0ncoms3d4i3tjnbfmrab9nr2cue0to8hra8s87f7r1v2ff2n3c5s68cbqnib3cmn21ddn45k829po4qkgaqkgsdr35uuqqhskvfdj4psldgp58r63g85kbjnn0i53d3sg5kibbe4tsigcavbp2oeee5s63ro68k2eu74t237jghsmtsaocpo771jjtqdh071ogtfp7bel1d6mlj63f88o2adg88uc07l8f62r
fmclslvi1p8pmpbnfg8ikfpp3uce5db0ncoms3d4i3tjnbfmrab9nr2cue0to8hra8s87f7r1v2ff2n3c5s68cbqnib3cmn21ddn45k829po4qkgaqkgsdr35uuqqhskvfdj4psldgp58r63g85kbjnn0i53d3sg5kibbe4tsigcavbp2oeee5s63ro68k2eu74t237jghsmtsaocpo771jjtqdh071ogtfp7bel1d6mlj63f88o2adg88uc07l8f62r
5jf605re18gpuqpvr30gfku1l905edlq7rqrslrh0mkubvlj91crk18htfnbeni3r7c36eukffl04sppprqdco6k23dm0mev9noa5f75frjls40tigeduo2jmfe3f8bneja11nn09giplo0kpn45tnm4etp9jhp5h42gim1ia9lm8s5c6m39gm7h0mcgpj0fogmo2bo319tlgppuhcp6t887s5jvm52fg191mc55cgb7inlir8ail3gbnsdea46vj041
jggsrderm9rpegoq9gapjl4dk14ubc4mb0nfq2bdkqtrdvuihv8q8f0dbqm63n7ojghcchb4c7gbp77011fl8hrfrl0mf9c3mfvhk1acfuofedjido44247ffs8lt28304o8i1luumbnbvbdrqdl8nno4b3t0pul8ep0t6vsgl7bp64pnlfiisepvq1urujtsbbou6batft6ide1qu8ps783qhn07v8n3i2jdj20v599np1ahrkcso9is0g1ja1e31fl
4h1rkt6n478h83db250ldqt517dn40ae5960qmsr5nnsqlv5av2eqmn61sruug2etqjn7h4l5lh2i9i0q1s08d5f6kpmaep8oms77u91gvfop1j2vds6v6o85fbn3cmi27it9l7ogrm305eunsrdgnblhn2bvisa2vkhhi469mj22oklgc3sqcd59eh6e52uhop60vjdq20qcq9up825v446eukk19jmoi2sisnka03kagb7ueaqmdq6rd9lq0g25l20
n1hv30da7d7naqs34r0hgf5r7r75gr8u98vko7krfdogka2d0t0pb7koiuojdm0rn3uc99g9a8epf81e1rkgn98fho2vih003vt759d8asoou9qmnmu5gluejlarbalcf4dk2kic9l9fhhv45vghcuu8tfppo2dapvrnafoetuqlgt4eb6o997iod3fcd5jpfqa47mhnfd5p81fudfcvqej6t69q7hv38nqbgdgtjj73thn534g1j3pc4as8c4dkj8dj
ok we done

您会看到第一个动态生成的行被重复。
这似乎表明PrintWriter.flush()方法无法同步。

为什么这不起作用?我可以做什么来实现我的目标?

谢谢。

I am trying to write a threaded Java EE servlet.
Each thread will do some processing and write its results in a single HTTP chunk (using HTTP Transfer-encoding: chunked).
Each chunk will be sent to the client as soon as it is available.
I am having issues writing and flushing the output within each thread.
For the sake of example, the code below spawns 5 threads that generate a random string, write it to the output buffer and flush that buffer through a synchronized function call.

public class TestServer extends HttpServlet {
   private PrintWriter m_out;

   public TestServer() {
      super();
   }

   protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
      m_out = response.getWriter();

      m_out.print("gonna start\n");
      m_out.flush();

      ThreadPoolExecutor thread_pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
      for(int i = 0; i < 5; i++ ) {
         thread_pool.execute(new Runnable() {
        private ReentrantLock m_lock = new ReentrantLock();
        @Override
        public void run() {
               SecureRandom random = new SecureRandom();
               m_lock.lock();
               try {
                  display(new BigInteger(1300, random).toString(32)+"\n");
               }
               finally {
                  m_lock.unlock();
               }
            }
         });
      }
      thread_pool.shutdown();
      try {
         thread_pool.awaitTermination(120, TimeUnit.SECONDS);
      }
      catch (InterruptedException e1) {
         log("thread interrupted");
         e1.printStackTrace();
      }
      m_out.print("ok we done\n");
      m_out.close();
   }

   void display(String content) {
      m_out.print(content);
      m_out.flush();
   }
}

Once in a while, this code will generate the following output:

Server: Apache-Coyote/1.1
Transfer-Encoding: chunked
Date: Mon, 14 Mar 2011 22:26:15 GMT
Connection: close

gonna start
fmclslvi1p8pmpbnfg8ikfpp3uce5db0ncoms3d4i3tjnbfmrab9nr2cue0to8hra8s87f7r1v2ff2n3c5s68cbqnib3cmn21ddn45k829po4qkgaqkgsdr35uuqqhskvfdj4psldgp58r63g85kbjnn0i53d3sg5kibbe4tsigcavbp2oeee5s63ro68k2eu74t237jghsmtsaocpo771jjtqdh071ogtfp7bel1d6mlj63f88o2adg88uc07l8f62r
fmclslvi1p8pmpbnfg8ikfpp3uce5db0ncoms3d4i3tjnbfmrab9nr2cue0to8hra8s87f7r1v2ff2n3c5s68cbqnib3cmn21ddn45k829po4qkgaqkgsdr35uuqqhskvfdj4psldgp58r63g85kbjnn0i53d3sg5kibbe4tsigcavbp2oeee5s63ro68k2eu74t237jghsmtsaocpo771jjtqdh071ogtfp7bel1d6mlj63f88o2adg88uc07l8f62r
5jf605re18gpuqpvr30gfku1l905edlq7rqrslrh0mkubvlj91crk18htfnbeni3r7c36eukffl04sppprqdco6k23dm0mev9noa5f75frjls40tigeduo2jmfe3f8bneja11nn09giplo0kpn45tnm4etp9jhp5h42gim1ia9lm8s5c6m39gm7h0mcgpj0fogmo2bo319tlgppuhcp6t887s5jvm52fg191mc55cgb7inlir8ail3gbnsdea46vj041
jggsrderm9rpegoq9gapjl4dk14ubc4mb0nfq2bdkqtrdvuihv8q8f0dbqm63n7ojghcchb4c7gbp77011fl8hrfrl0mf9c3mfvhk1acfuofedjido44247ffs8lt28304o8i1luumbnbvbdrqdl8nno4b3t0pul8ep0t6vsgl7bp64pnlfiisepvq1urujtsbbou6batft6ide1qu8ps783qhn07v8n3i2jdj20v599np1ahrkcso9is0g1ja1e31fl
4h1rkt6n478h83db250ldqt517dn40ae5960qmsr5nnsqlv5av2eqmn61sruug2etqjn7h4l5lh2i9i0q1s08d5f6kpmaep8oms77u91gvfop1j2vds6v6o85fbn3cmi27it9l7ogrm305eunsrdgnblhn2bvisa2vkhhi469mj22oklgc3sqcd59eh6e52uhop60vjdq20qcq9up825v446eukk19jmoi2sisnka03kagb7ueaqmdq6rd9lq0g25l20
n1hv30da7d7naqs34r0hgf5r7r75gr8u98vko7krfdogka2d0t0pb7koiuojdm0rn3uc99g9a8epf81e1rkgn98fho2vih003vt759d8asoou9qmnmu5gluejlarbalcf4dk2kic9l9fhhv45vghcuu8tfppo2dapvrnafoetuqlgt4eb6o997iod3fcd5jpfqa47mhnfd5p81fudfcvqej6t69q7hv38nqbgdgtjj73thn534g1j3pc4as8c4dkj8dj
ok we done

You see that the 1 first dynamically generated line is repeated.
This seems to indicate that the PrintWriter.flush() method cannot be synchronized.

Why is this not working, and what can I do to acheive my goal?

Thank you.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

梦行七里 2024-10-28 16:22:49

为了将来的参考,我想我可以使用以下代码:

@SuppressWarnings("serial")
public class TestServer extends HttpServlet {
   private final int THREADPOOL_SIZE = 10;
   private ThreadPoolExecutor thread_pool;

   public void init(ServletConfig config) throws ServletException {
      super.init(config);
      thread_pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(THREADPOOL_SIZE);
   }

   public void destroy() {
      thread_pool.shutdownNow();
   }

   public TestServer() {
          super();
   }

   protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
      CompletionService<String> completion_service_pool = new ExecutorCompletionService<String>(thread_pool);
      int task_num = 10;
      final int line_length = 13000;
      for(int i = 0; i < task_num; i++) {
         completion_service_pool.submit(new Callable<String>(){
            @Override
            public String call() {
               Random rn = new Random();
               int sleep_seconds = Math.abs(rn.nextInt() % 10);
               try {
                  Thread.sleep(sleep_seconds * 1000);
               }
               catch (Exception e) {

               }
               return String.format("Slept for %d seconds. %s<br>\n", sleep_seconds, new BigInteger(line_length, rn).toString(32));
            }
         });
      }

      PrintWriter out = response.getWriter();
      response.setContentType("text/html");
      out.print("gonna start<br>\n");
      out.flush();

      for(int i = 0; i < task_num; ) {
         try {
            Future<String> result_task = completion_service_pool.poll();
            if(result_task != null) {
               String result_string = result_task.get();
               out.print(result_string);
               out.flush();
               i++;
            }
         }
         catch (Exception e) {
         }
      }

      out.print("ok we done<br>\n");
      out.close();
   }
}

For future reference, I think I got it to work with the following code:

@SuppressWarnings("serial")
public class TestServer extends HttpServlet {
   private final int THREADPOOL_SIZE = 10;
   private ThreadPoolExecutor thread_pool;

   public void init(ServletConfig config) throws ServletException {
      super.init(config);
      thread_pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(THREADPOOL_SIZE);
   }

   public void destroy() {
      thread_pool.shutdownNow();
   }

   public TestServer() {
          super();
   }

   protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
      CompletionService<String> completion_service_pool = new ExecutorCompletionService<String>(thread_pool);
      int task_num = 10;
      final int line_length = 13000;
      for(int i = 0; i < task_num; i++) {
         completion_service_pool.submit(new Callable<String>(){
            @Override
            public String call() {
               Random rn = new Random();
               int sleep_seconds = Math.abs(rn.nextInt() % 10);
               try {
                  Thread.sleep(sleep_seconds * 1000);
               }
               catch (Exception e) {

               }
               return String.format("Slept for %d seconds. %s<br>\n", sleep_seconds, new BigInteger(line_length, rn).toString(32));
            }
         });
      }

      PrintWriter out = response.getWriter();
      response.setContentType("text/html");
      out.print("gonna start<br>\n");
      out.flush();

      for(int i = 0; i < task_num; ) {
         try {
            Future<String> result_task = completion_service_pool.poll();
            if(result_task != null) {
               String result_string = result_task.get();
               out.print(result_string);
               out.flush();
               i++;
            }
         }
         catch (Exception e) {
         }
      }

      out.print("ok we done<br>\n");
      out.close();
   }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文