500个工作线程,什么样的线程池?

发布于 2024-09-02 07:17:00 字数 995 浏览 7 评论 0 原文

我想知道这是否是最好的方法。我有大约 500 个无限期运行的线程,但在完成一个处理周期后 Thread.sleep 一分钟。

   ExecutorService es = Executors.newFixedThreadPool(list.size()+1);
   for (int i = 0; i < list.size(); i++) {
      es.execute(coreAppVector.elementAt(i)); //coreAppVector is a vector of extends thread objects
   }

正在执行的代码非常简单,基本上就是这样,

class aThread extends Thread {
   public void run(){
      while(true){
         Thread.sleep(ONE_MINUTE);
         //Lots of computation every minute
      }
   }
}

我确实需要为每个正在运行的任务分配一个单独的线程,因此更改架构不是一个选择。我尝试使我的 threadPool 大小等于 Runtime.getRuntime().availableProcessors() ,它尝试运行所有 500 个线程,但只让其中 8 个(4x超线程)执行。其他线程不会投降并让其他线程轮流执行。我尝试添加 wait() 和 notification(),但仍然没有成功。如果有人有一个简单的例子或一些提示,我将不胜感激!

嗯,这个设计可以说是有缺陷的。线程实现遗传编程或 GP,一种学习算法。每个线程都会分析高级趋势并做出预测。如果线程完成,学习的内容就会丢失。也就是说,我希望 sleep() 能够让我在一个线程不“学习”时共享一些资源

所以实际要求是

如何安排维护任务 状态并每 2 分钟运行一次,但是 控制一次执行多少个。

I am wondering if this is the best way to do this. I have about 500 threads that run indefinitely, but Thread.sleep for a minute when done one cycle of processing.

   ExecutorService es = Executors.newFixedThreadPool(list.size()+1);
   for (int i = 0; i < list.size(); i++) {
      es.execute(coreAppVector.elementAt(i)); //coreAppVector is a vector of extends thread objects
   }

The code that is executing is really simple and basically just this

class aThread extends Thread {
   public void run(){
      while(true){
         Thread.sleep(ONE_MINUTE);
         //Lots of computation every minute
      }
   }
}

I do need a separate threads for each running task, so changing the architecture isn't an option. I tried making my threadPool size equal to Runtime.getRuntime().availableProcessors() which attempted to run all 500 threads, but only let 8 (4xhyperthreading) of them execute. The other threads wouldn't surrender and let other threads have their turn. I tried putting in a wait() and notify(), but still no luck. If anyone has a simple example or some tips, I would be grateful!

Well, the design is arguably flawed. The threads implement Genetic-Programming or GP, a type of learning algorithm. Each thread analyzes advanced trends makes predictions. If the thread ever completes, the learning is lost. That said, I was hoping that sleep() would allow me to share some of the resources while one thread isn't "learning"

So the actual requirements are

how can I schedule tasks that maintain
state and run every 2 minutes, but
control how many execute at one time.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(11

心房敞 2024-09-09 07:17:00

如果您的线程没有终止,这是线程内代码的错误,而不是线程池的错误。要获得更详细的帮助,您需要发布正在执行的代码。

另外,为什么每个线程完成后都让其休眠?让它完成不是更好吗?

此外,我认为您通过使线程数等于您希望执行的任务数来滥用线程池。线程池的要点是对使用的资源数量进行限制;这种方法并不比根本不使用线程池好。

最后,您不需要将 Thread 实例传递给 ExecutorService,只需传递 Runnable 实例即可。 ExecutorService 维护自己的线程池,这些线程无限循环,从内部队列中提取工作(工作是您提交的 Runnable )。

If your threads are not terminating, this is the fault of the code within the thread, not the thread pool. For more detailed help you will need to post the code that is being executed.

Also, why do you put each Thread to sleep when it is done; wouldn't it be better just to let it complete?

Additionally, I think you are misusing the thread pool by having a number of threads equal to the number of tasks you wish to execute. The point of a thread pool is to put a constraint on the number of resources used; this approach is no better than not using a thread pool at all.

Finally, you don't need to pass instances of Thread to your ExecutorService, just instances of Runnable. ExecutorService maintains its own pool of threads which loop indefinitely, pulling work off of an internal queue (the work being the Runnables you submit).

缘字诀 2024-09-09 07:17:00

为什么不使用 ScheduledExecutorService安排每个任务每分钟运行一次,而不是让所有这些线程空闲一整分钟?

ScheduledExecutorService workers = 
  Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
for (Runnable task : list) { 
  workers.scheduleWithFixedDelay(task, 0, 1, TimeUnit.MINUTES);
}

“改变架构不是一个选择”是什么意思?如果您的意思是您根本无法修改您的任务(具体来说,任务必须循环,而不是运行一次,并且调用 Thread.sleep()无法删除),那么“良好的性能也不是一个选择”。

Why not used a ScheduledExecutorService to schedule each task to run once per minute, instead of leaving all these threads idle for a full minute?

ScheduledExecutorService workers = 
  Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
for (Runnable task : list) { 
  workers.scheduleWithFixedDelay(task, 0, 1, TimeUnit.MINUTES);
}

What do you mean by, "changing the architecture isn't an option"? If you mean that you can't modify your task at all (specifically, the tasks have to loop, instead of running once, and the call to Thread.sleep() can't be removed), then "good performance isn't an option," either.

池木 2024-09-09 07:17:00

我不确定您的代码在如何使用线程池方面在语义上是否正确。 ExecutionService 在内部创建和管理线程,客户端只需提供一个 Runnable 实例,其 run() 方法将在池线程之一的上下文中执行。您可以查看我的示例。另请注意,每个正在运行的线程占用约 10Mb 的系统内存用于堆栈,并且在 Linux 上,java 到本机线程的映射是 1 对 1。

I'm not sure your code is semantically correct in how it's using a thread pool. ExecutionService creates and manages threads internally, a client should just supply an instance of Runnable, whose run() method will be executed in context of one of pooled threads. You can check my example. Also note that each running thread takes ~10Mb of system memory for the stack, and on linux the mapping of java-to-native threads is 1-to-1.

鸢与 2024-09-09 07:17:00

您应该让它返回并使用 ThreadPoolexecutor 执行每分钟发布到工作队列的工作。

Instead of putting a tread to sleep you should let it return and use a ThreadPoolexecutor to execute work posted every minute to your work queue.

亣腦蒛氧 2024-09-09 07:17:00

回答你的问题,什么类型的线程池?

我发表了我的评论,但这确实应该解决您的问题。您的计算可能需要 2 秒才能完成。您有许多任务 (500) 希望尽快完成。假设没有 IO 和/或网络流量,您可以实现的最快吞吐量是使用 Runtime.getRuntime().availableProcessors() 线程数。

如果将线程数增加到 500 个,则每个任务都将在自己的线程上执行,但操作系统会每隔一段时间调度一个线程以分配给另一个线程。也就是说,在任何给定点都会进行 125 次上下文切换。每次上下文切换都会增加每个任务运行的时间。

这里的大局是,当处理器数量远远超过时,添加更多线程并不等于更高的吞吐量。

编辑:快速更新。你不需要睡在这里。当您使用 8 个处理器执行 500 个任务时,每个任务将在 2 秒内完成,完成后,它运行的线程将执行下一个任务并完成该任务。

To answer your question, what type of thread pool?

I posted my comments but this really should address your issue. You have a computation that can take 2 seconds to complete. You have many tasks (500) that you want to be completed as fast as possible. The fastest possible throughput you can achieve, assuming there is no IO and or network traffic, is with Runtime.getRuntime().availableProcessors() number of threads.

If you increase your number to 500 threads, then each task will be executing on its own thread, but the OS will schedule a thread out every so often to give to another thread. Thats 125 context switches at any given point. Each context switch will increase the amount of time for each task to run.

The big picture here is that adding more threads does NOT equal greater throughput when you are way over the number of processors.

Edit: A quick update. You dont need to sleep here. When you execute the 500 tasks with 8 processors, each task will complete in the 2 seconds, finish and the thread it was running on will then take the next task and complete that one.

顾北清歌寒 2024-09-09 07:17:00

8 个线程是您的系统可以处理的最大值,如果超过 8 个线程,上下文切换就会减慢您的速度。

看看这篇文章 http://www.informit.com/ articles/article.aspx?p=1339471&seqNum=4 它将为您提供如何操作的概述。

8 Threads is the max that your system can handle, any more and you are slowing yourself down with context switching.

Look at this article http://www.informit.com/articles/article.aspx?p=1339471&seqNum=4 It will give you an overview of how to do it.

莫言歌 2024-09-09 07:17:00

这应该做你想要的,但不是你要求的:-)你必须取出 Thread.sleep()

ScheduledRunnable.java

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledRunnable
{
    public static void main(final String[] args)
    {
        final int numTasks = 10;
        final ScheduledExecutorService ses = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
        for (int i = 0; i < numTasks; i++)
        {
            ses.scheduleAtFixedRate(new MyRunnable(i), 0, 10, TimeUnit.SECONDS);
        }
    }

    private static class MyRunnable implements Runnable
    {
        private int id;
        private int numRuns;

        private MyRunnable(final int id)
        {
            this.id = id;
            this.numRuns = 0;
        }

        @Override
        public void run()
        {
            this.numRuns += 1;
            System.out.format("%d - %d\n", this.id, this.numRuns);
        }
    }
}

这会每 10 秒安排一次 Runnables 来显示行为。
如果您确实需要在处理完成后等待固定的时间,您可能需要尝试使用您需要的 .scheduleXXX 方法。我认为,fixedWait 只会每 N 次运行一次,无论执行时间是多少。

This should do what you desire, but not what you asked for :-) You have to take out the Thread.sleep()

ScheduledRunnable.java

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledRunnable
{
    public static void main(final String[] args)
    {
        final int numTasks = 10;
        final ScheduledExecutorService ses = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
        for (int i = 0; i < numTasks; i++)
        {
            ses.scheduleAtFixedRate(new MyRunnable(i), 0, 10, TimeUnit.SECONDS);
        }
    }

    private static class MyRunnable implements Runnable
    {
        private int id;
        private int numRuns;

        private MyRunnable(final int id)
        {
            this.id = id;
            this.numRuns = 0;
        }

        @Override
        public void run()
        {
            this.numRuns += 1;
            System.out.format("%d - %d\n", this.id, this.numRuns);
        }
    }
}

This schedules the Runnables every 10 SECONDS to show the behavior.
If you really need to wait a fixed amount of time AFTER processing is complete you might need to play around with which .scheduleXXX method that you need. I think fixedWait will just run it every N amount of time regardless of what the execution time is.

阳光①夏 2024-09-09 07:17:00

我确实需要为每个正在运行的任务分配一个单独的线程,因此更改架构不是一个选择。

如果这是真的(例如,调用外部阻塞函数),则为它们创建单独的线程并启动它们。您无法创建具有有限数量线程的线程池,因为其中一个线程中的阻塞函数将阻止将任何其他可运行对象放入其中,并且创建每个任务只有一个线程的线程池不会获得太多好处。

我尝试使我的 threadPool 大小等于 Runtime.getRuntime().availableProcessors() ,它尝试运行所有 500 个线程,但只让其中 8 个(4x 超线程)执行。

当您将创建的 Thread 对象传递给线程池时,它只会看到它们实现了 Runnable。因此,它将运行每个 Runnable 直至完成。任何停止 run() 方法返回的循环都将不允许下一个排队任务运行;例如:

public static void main (String...args) {
    ExecutorService executor = Executors.newFixedThreadPool(2);

    for (int i = 0; i < 10; ++i) {
        final int task = i;

        executor.execute(new Runnable () {
        private long lastRunTime = 0;
            @Override
            public void run () {

                for (int iteration = 0; iteration < 4; )
                {
                    if (System.currentTimeMillis() - this.lastRunTime > TIME_OUT)
                    {
                        // do your work here
                        ++iteration;
                        System.out.printf("Task {%d} iteration {%d} thread {%s}.\n", task, iteration, Thread.currentThread());

                        this.lastRunTime = System.currentTimeMillis();
                    }
                    else
                    {
                        Thread.yield(); // otherwise, let other threads run
                    }
                }
            }
        });
    }

    executor.shutdown();
}

打印出:

Task {0} iteration {1} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {1} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {2} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {2} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {3} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {3} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {4} thread {Thread[pool-1-thread-1,5,main]}.
Task {2} iteration {1} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {4} thread {Thread[pool-1-thread-2,5,main]}.
Task {3} iteration {1} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {2} thread {Thread[pool-1-thread-1,5,main]}.
Task {3} iteration {2} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {3} thread {Thread[pool-1-thread-1,5,main]}.
Task {3} iteration {3} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {4} thread {Thread[pool-1-thread-1,5,main]}.
...

显示第一个(线程池大小)任务在计划下一个任务之前运行完成。

您需要做的是创建运行一段时间的任务,然后让其他任务运行。如何构建这些取决于您想要实现的目标,

  • 是希望所有任务同时运行、所有任务等待一分钟,然后再次同时运行,还是任务之间不同步其他
  • 是否确实希望每个任务以一分钟的间隔运行
  • 是否您的任务可能会阻塞,因此确实需要单独的线程
  • 如果任务阻塞时间超过了运行的预期窗口,
  • 则预期会出现什么行为 如果任务阻塞时间长于重复率(阻塞时间超过一分钟)

根据这些问题的答案,可以使用 ScheduledExecutorService、信号量或互斥体的某种组合来协调任务。最简单的情况是非阻塞、非同步任务,在这种情况下,直接使用 ScheduledExecutorService 每分钟运行一次可运行对象。

I do need a separate threads for each running task, so changing the architecture isn't an option.

If that is true (for example, making a call to an external blocking function), then create separate threads for them and start them. You can't create a thread pool with a limited number of threads, as a blocking function in one of threads will prevent any other runnable being put into it, and don't gain much creating a thread pool with one thread per task.

I tried making my threadPool size equal to Runtime.getRuntime().availableProcessors() which attempted to run all 500 threads, but only let 8 (4xhyperthreading) of them execute.

When you pass the Thread objects you are creating to thread pool, it only sees that they implement Runnable. Therefore it will run each Runnable to completion. Any loop which stops the run() method returning will not allow the next enqueued task to run; eg:

public static void main (String...args) {
    ExecutorService executor = Executors.newFixedThreadPool(2);

    for (int i = 0; i < 10; ++i) {
        final int task = i;

        executor.execute(new Runnable () {
        private long lastRunTime = 0;
            @Override
            public void run () {

                for (int iteration = 0; iteration < 4; )
                {
                    if (System.currentTimeMillis() - this.lastRunTime > TIME_OUT)
                    {
                        // do your work here
                        ++iteration;
                        System.out.printf("Task {%d} iteration {%d} thread {%s}.\n", task, iteration, Thread.currentThread());

                        this.lastRunTime = System.currentTimeMillis();
                    }
                    else
                    {
                        Thread.yield(); // otherwise, let other threads run
                    }
                }
            }
        });
    }

    executor.shutdown();
}

prints out:

Task {0} iteration {1} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {1} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {2} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {2} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {3} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {3} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {4} thread {Thread[pool-1-thread-1,5,main]}.
Task {2} iteration {1} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {4} thread {Thread[pool-1-thread-2,5,main]}.
Task {3} iteration {1} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {2} thread {Thread[pool-1-thread-1,5,main]}.
Task {3} iteration {2} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {3} thread {Thread[pool-1-thread-1,5,main]}.
Task {3} iteration {3} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {4} thread {Thread[pool-1-thread-1,5,main]}.
...

showing that the first (thread pool size) tasks run to completion before the next tasks get scheduled.

What you need to do is create tasks which run for a while, then let other tasks run. Quite how you structure these depends on what you want to achieve

  • whether you want all the tasks to run at the same time, the all wait for a minute, then all run at the same time again, or whether the tasks are not synchronised with each other
  • whether you really wanted each task to run at a one-minute interval
  • whether your tasks are potentially blocking or not, and so really require separate threads
  • what behaviour is expected if a task blocks longer than the expected window for running
  • what behaviour is expected if a task blocks longer than the repeat rate (blocks for more than one minute)

Depending on the answers to these, some combination of ScheduledExecutorService, semaphores or mutexes can be used to co-ordinate the tasks. The simplest case is the non-blocking, non-synchronous tasks, in which case use a ScheduledExecutorService directly to run your runnables once every minute.

南城追梦 2024-09-09 07:17:00

您可以重写您的项目以使用一些基于代理的并发框架,例如 Akka 吗?

Can you rewrite your project for using some agent-based concurrency framework, like Akka?

澜川若宁 2024-09-09 07:17:00

通过将线程数量减少到系统实际可以处理的数量,您当然可以发现吞吐量有所提高。您愿意稍微改变一下螺纹的设计吗?它将减轻调度程序的负担,将休眠线程放入队列中,而不是实际拥有数百个休眠线程。

class RepeatingWorker implements Runnable {

private ExecutorService executor;
private Date lastRan;

//constructor takes your executor

@Override
public void run() {

  try {
    if (now > lastRan + ONE_MINUTE) {
      //do job
      lastRan = now;
    } else {
      return;
  } finally {
    executor.submit(this);
  }
}
}

这保留了“作业无限重复,但在执行之间至少等待一分钟”的核心语义,但现在您可以将线程池​​调整为机器可以处理的内容,而那些不工作的线程则在队列中,而不是徘徊在调度程序中作为休眠线程。如果没有人真正做任何事情,就会出现一些等待繁忙的行为,但我从您的帖子中假设应用程序的整个目的是运行这些线程,并且它当前正在影响您的处理器。如果必须为其他事情留出空间,您可能需要对此进行调整:)

You can certainly find some improvement in throughput by reducing the number of threads to what the system can realistically handle. Are you open to changing the design of the thread a bit? It'll unburden the scheduler to put the sleeping ones in a queue instead of actually having hundreds of sleeping threads.

class RepeatingWorker implements Runnable {

private ExecutorService executor;
private Date lastRan;

//constructor takes your executor

@Override
public void run() {

  try {
    if (now > lastRan + ONE_MINUTE) {
      //do job
      lastRan = now;
    } else {
      return;
  } finally {
    executor.submit(this);
  }
}
}

This preserves your core semantic of 'job repeats indefinitely, but waits at least one minute between executions' but now you can tune the thread pool to something the machine can handle and the ones that aren't working are in a queue instead of loitering about in the scheduler as sleeping threads. There is some wait busy behavior if nobody's actually doing anything, but I am assuming from your post that the entire purpose of the application is to run these threads and it's currently railing your processors. You may need to tune around that if room has to be made for other things :)

浴红衣 2024-09-09 07:17:00

你需要一个信号量。

class AThread extends Thread {
   Semaphore sem;
   AThread(Semaphore sem) {
     this.sem = sem;
   }
   public void run(){
      while(true){
         Thread.sleep(ONE_MINUTE);
         sem.acquire();
         try {
           //Lots of computation every minute
         } finally {
           sem.release();
         }
      }
   }
}

实例化 AThreads 时,您需要传递相同的信号量实例:

Semaphore sem = new Semaphore(MAX_AVAILABLE, true);

编辑:谁投票否决了,请解释一下原因?我的解决方案有问题吗?

You need a semaphore.

class AThread extends Thread {
   Semaphore sem;
   AThread(Semaphore sem) {
     this.sem = sem;
   }
   public void run(){
      while(true){
         Thread.sleep(ONE_MINUTE);
         sem.acquire();
         try {
           //Lots of computation every minute
         } finally {
           sem.release();
         }
      }
   }
}

When instantiating the AThreads you need to pass the same semaphore instance:

Semaphore sem = new Semaphore(MAX_AVAILABLE, true);

Edit: Who voted down can please explain why? There is something wrong in my solution?

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文