Java +线程:并行处理线

发布于 2024-11-02 14:59:04 字数 1601 浏览 0 评论 0原文

我想并行处理大量独立的线。在下面的代码中,我创建了一个包含 POOL_SIZE 行的 NUM_THREAD 个主题池。 每个线程都启动,然后我使用“join”等待每个线程。

我想这是一个不好的做法,因为在这里,完成的线程将不得不在池中等待他的兄弟姐妹。

实现此代码的正确方法是什么?我应该使用哪些类?

谢谢 !

class FasterBin extends Thread
    {
    private List<String> dataRows=new ArrayList<String>();
    private Object result=null;
    @Override
    public void run()
        {
        for(String s:dataRows)
            {
            //Process item here (....)
            }
        }
    }


(...)

List<FasterBin> threads=new Vector<FasterBin>();
String line;
Iterator<String> iter=(...);
for(;;)
    {
    while(threads.size()< NUM_THREAD)
        {
        FasterBin bin=new FasterBin();
        while(
            bin.dataRows.size() < POOL_SIZE &&
            iter.hasNext()
            )
            {
            nRow++;
            bin.dataRows.add(iter.next());
            }
        if(bin.dataRows.isEmpty()) break;
        threads.add(bin);
        }
    if(threads.isEmpty()) break;


    for(FasterBin t:threads)
        {
        t.start();
        }
    for(FasterBin t:threads)
        {
        t.join();
        }
    for(FasterBin t:threads)
        {
        save(t.result);// ## do something with the result (save into a db etc...)
        }

    threads.clear();
    }

finally
    {
    while(!threads.isEmpty())
        {

        FasterBin b=threads.remove(threads.size()-1);
        try     {
            b.interrupt();
            }
        catch (Exception e)
            {
            }
        }
    }

I want to process a large number of independant lines in parallel. In the following code I'm creating a pool of NUM_THREAD Theads containing POOL_SIZE lines.
Each thread is started and I then wait for each thread using 'join'.

I guess it is a bad practice as here, a finished Thread will have to wait for his siblings in the pool.

What would be the correct way to implement this code ? Which classes should I use ?

Thanks !

class FasterBin extends Thread
    {
    private List<String> dataRows=new ArrayList<String>();
    private Object result=null;
    @Override
    public void run()
        {
        for(String s:dataRows)
            {
            //Process item here (....)
            }
        }
    }


(...)

List<FasterBin> threads=new Vector<FasterBin>();
String line;
Iterator<String> iter=(...);
for(;;)
    {
    while(threads.size()< NUM_THREAD)
        {
        FasterBin bin=new FasterBin();
        while(
            bin.dataRows.size() < POOL_SIZE &&
            iter.hasNext()
            )
            {
            nRow++;
            bin.dataRows.add(iter.next());
            }
        if(bin.dataRows.isEmpty()) break;
        threads.add(bin);
        }
    if(threads.isEmpty()) break;


    for(FasterBin t:threads)
        {
        t.start();
        }
    for(FasterBin t:threads)
        {
        t.join();
        }
    for(FasterBin t:threads)
        {
        save(t.result);// ## do something with the result (save into a db etc...)
        }

    threads.clear();
    }

finally
    {
    while(!threads.isEmpty())
        {

        FasterBin b=threads.remove(threads.size()-1);
        try     {
            b.interrupt();
            }
        catch (Exception e)
            {
            }
        }
    }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

过期以后 2024-11-09 14:59:04

不要自己做这一切!不要!要做到 1) 稳健和 2) 正确是极其困难的。

相反,重写您的内容以创建大量 Runnables 或 Callables,并使用合适的 ExecutorService 来让 Executor 以您想要的行为处理它们。

请注意,它保留在当前 JVM 内。如果您有多个可用的 JVM(在多台机器上),我建议您提出一个新问题。

Do NOT do all this by yourself! It is extremely hard to get 1) robust and 2) right.

Instead rewrite your stuff to create a lot of Runnables or Callables and use a suitable ExecutorService to get an Executor to process them with the behaviour you want.

Note that this stay inside the current JVM. If you have more than one JVM available (on multiple machines) I would recommend opening a new question.

云淡月浅 2024-11-09 14:59:04

java.util.concurrent.ThreadPoolExecutor。

        ThreadPoolExecutor  x=new ScheduledThreadPoolExecutor(10);
        x.execute(runnable);

有关概述,请参阅:Java util.concurrent API

java.util.concurrent.ThreadPoolExecutor.

        ThreadPoolExecutor  x=new ScheduledThreadPoolExecutor(10);
        x.execute(runnable);

See this for an overview: Java API for util.concurrent

往事随风而去 2024-11-09 14:59:04

实际上不鼓励直接使用线程 - 查看 java.util.concurrent 包,您会发现应该使用 ThreadPools 和 Futures 来代替。

Thread.join 并不意味着该线程等待其他线程,它意味着您的主线程等待列表中的线程之一死亡。在这种情况下,您的主线程等待最慢的工作线程完成。我认为这种方法没有问题。

Direct use of Threads is actually discouraged - look at the package java.util.concurrent, you'll find there ThreadPools and Futures which should be used instead.

Thread.join doesn't mean that the Thread waits for others, it means your main Thread waits for one of the Thread in list to die. In this case your main Thread waits for the slowiest working Thread to finish. I don't see a problem with this approach.

抚你发端 2024-11-09 14:59:04

是的,从某种意义上说,一个已完成的线程必须等待池中的同级线程:当一个线程完成时,它会停止,并且不会帮助其他线程更快地完成。更好的说法是,整个工作等待工作时间最长的线程。

这是因为每个线程只有一个任务。您最好创建比线程数量多得多的任务,并将它们全部放在一个队列中。让所有工作线程循环地从该队列中获取任务。那么所有线程的时间差将大致是执行一项任务的时间,由于任务很小,所以这个时间很小。

您可以自己启动工作线程池,也可以将每个任务包装在 Runnable 中并将它们提交到标准线程池 - 这没有什么区别。

Yes, in some sense, a finished Thread would have to wait for his siblings in the pool: when a thread finishes, it stops, and does not help other threads to finish sooner. Better say, the whole work waits for the thread which works for the longest time.

This is because each thread has exactly one task. You better create many tasks, much more than the number of threads, and put them all in a single queue. Let all working threads take their tasks from that queue in a loop. Then the difference in time for all threads would be roughly the time to execute one task, which is small because tasks are small.

You can start the pool of working threads yourself, or you can wrap each task in a Runnable and submit them to a standard thread pool - this makes no difference.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文