Java 中的多线程帮助

发布于 2024-12-11 20:01:29 字数 857 浏览 0 评论 0原文

我是 Java 新手,我需要一些帮助来完成这个程序。这是一个大型类项目的一小部分,我必须使用多线程。

这是我想要在算法上做的事情:

while (there is still input left, store chunk of input in <chunk>)
{
    if there is not a free thread in my array then
      wait until a thread finishes

    else there is a free thread then
       apply the free thread to <chunk> (which will do something to chunk and output it).  
              Note: The ordering of the chunks being output must be the same as input
}

那么,我不知道该怎么做的主要事情:

  • 如何检查数组中是否有空闲线程?我知道有一个函数 ThreadAlive,但每次在循环中轮询每个线程似乎效率非常低。
  • 如果没有空闲线程,我如何才能等到一个线程完成?
  • 顺序很重要。如何保留线程输出的顺序?如图所示,输出的顺序需要与输入的顺序匹配。如何保证这种同步?
  • 我如何将块传递到我的线程?我可以只使用 Runnable 接口来做到这一点吗?

非常感谢任何有关这四个项目符号的帮助。因为我是一个超级菜鸟,所以代码示例会有很大帮助。

(旁注:创建一个线程数组只是我的一个想法,用于处理用户定义的线程数。如果您有更好的方法来处理这个问题,欢迎提出建议!)

I'm new to Java, and I need some help working on this program. This is a small part of a large class project, and I must use multithreading.

Here's what I want to do algorithmically:

while (there is still input left, store chunk of input in <chunk>)
{
    if there is not a free thread in my array then
      wait until a thread finishes

    else there is a free thread then
       apply the free thread to <chunk> (which will do something to chunk and output it).  
              Note: The ordering of the chunks being output must be the same as input
}

So, the main things I don't know how to do:

  • How can I check whether or not there's a free thread in the array? I know that there is a function ThreadAlive, but it seems super inefficient to poll every single thread every time in my loop.
  • If there is no free thread, how can I wait until one has finished?
  • The ordering is important. How can I preserve the ordering in which the threads output? As in, the order of the output needs to match the order of the input. How can I guarantee this synchronization?
  • How do I even pass the chunk to my thread? Can I just use the Runnable interface to do this?

Any help with these four bullets is greatly appreciated. Since I'm a super noob, code samples would help significantly.

(side-note: Making an array of threads was just an idea of mine to handle the user defined number of threads. If you have a better way to handle this you're welcome to suggest it!)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

睫毛溺水了 2024-12-18 20:01:29

听起来你基本上有一个生产者/消费者模型,可以通过 ExecutorService 和 BlockingQueue 来解决。这是一个具有类似答案的类似问题:

生产者/消费者工作队列

Sounds like you basically have a producer/consumer model and can be solved with an ExecutorService and BlockingQueue. Here is a similar question with a similar answer:

producer/consumer work queues

苯莒 2024-12-18 20:01:29

正如 @altaiojok 提到的,您想要使用 ExecutorService 阻塞队列。基本算法的工作原理如下:

ExecutorService executor = Executors.newFixedThreadPool(...); //or newCachedThreadPool, etc...
BlockingQueue<Future<?>> outputQueue = new LinkedBlockingQueue<Future<?>>();


//To be run by an input processing thread
void submitTasks() {
    BufferedReader input = ... //going to assume you have a file that you want to read, this could be any method of input (file, keyboard, network, etc...)
    String line = input.readLine();
    while(line != null) {
        outputQueue.add(executor.submit(new YourCallableImplementation(line)));
        line = input.readLine();
    }
}

//To be run by a different output processing thread
void processTaskOutput() {
    try {
        while(true) {
            Future<?> resultFuture = outputQueue.take();
            ? result = resultFuture.get();
            //process the output (write to file, send to network, print to screen, etc...
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

我将让您弄清楚如何实现 Runnable 来创建输入和输出线程以及如何实现 Callable您需要处理的任务。

As @altaiojok mentioned, you want to use an ExecutorService and BlockingQueue. The basic algorithm works like this:

ExecutorService executor = Executors.newFixedThreadPool(...); //or newCachedThreadPool, etc...
BlockingQueue<Future<?>> outputQueue = new LinkedBlockingQueue<Future<?>>();


//To be run by an input processing thread
void submitTasks() {
    BufferedReader input = ... //going to assume you have a file that you want to read, this could be any method of input (file, keyboard, network, etc...)
    String line = input.readLine();
    while(line != null) {
        outputQueue.add(executor.submit(new YourCallableImplementation(line)));
        line = input.readLine();
    }
}

//To be run by a different output processing thread
void processTaskOutput() {
    try {
        while(true) {
            Future<?> resultFuture = outputQueue.take();
            ? result = resultFuture.get();
            //process the output (write to file, send to network, print to screen, etc...
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

I'll leave it to you to figure out how to implement Runnable to make the input and output thread as well as how to implement Callable for the tasks you need to process.

人间☆小暴躁 2024-12-18 20:01:29

我建议使用 commons-pool 它提供线程池,以便您可以轻松限制使用的线程数量它还提供了一些其他辅助方法。

关于排序:查看 synchronize 关键字。

我建议看一下java教程(关于并发的部分): http://download.oracle.com/javase/tutorial/essential/concurrency/index.html

I would suggest using commons-pool which offers pooling of threads so you can easily limit the number of used threads and it also offers some other helper methods.

Concerning the ordering: have a look at the synchronize keyword.

And I would suggest to have a look at the java tutorial (the part about concurrency): http://download.oracle.com/javase/tutorial/essential/concurrency/index.html

我三岁 2024-12-18 20:01:29

流可能会派上用场:

List<Chunk> chunks = new ArrayList<>();
//....
Function<Chunk, String> toWeightInfo = (chunk) -> "weight = "+(chunk.size()*chunk.prio());

List<String> results = chunks.parallelStream()
  .map(toWeightInfo)
  .collect(Collectors.toList());

System.out.println(results);

并行流使用系统默认的“fork/join”线程池,该线程池应该是可用逻辑 CPU 的大小并并行处理您的内容。它还保证结果的顺序相同。
并行流 API 隐藏了将空闲线程分配给作业和优化(例如窃取工作)的所有复杂性。只要给它一些咀嚼的东西,它就会发挥它的魔力。

如果需要使用自定义大小的线程池,请参考
Java 8 并行流中的自定义线程池问题。
您也可以看看这个很好的 Java 8 Stream 教程

如果您的情况相当复杂,并且您正在将块流式传输到程序中,并且您有多个工作阶段,其中有些必须是串行的,有些可以并行,有些相互依赖,那么您可能会看看< a href="https://lmax-exchange.github.io/disruptor/" rel="nofollow noreferrer">LMAX 的 Disruptor 框架。

亲切的问候

Streams might come handy:

List<Chunk> chunks = new ArrayList<>();
//....
Function<Chunk, String> toWeightInfo = (chunk) -> "weight = "+(chunk.size()*chunk.prio());

List<String> results = chunks.parallelStream()
  .map(toWeightInfo)
  .collect(Collectors.toList());

System.out.println(results);

The parallel stream uses the System's default "fork/join" thread pool, which should be the size of available logical CPUs and processes your stuff in parallel. It also guarantees the same order of results.
The parallel streams API hides all the complexity of assigning free threads to jobs and optimizations like work-stealing away from you. Just give it something to chew on and it will work its magic.

If you need to use a thread pool of a custom size, please refer to the
Custom thread pool in Java 8 parallel stream question.
You might also have a look at this good Java 8 Stream Tutorial.

If your case is rather complex and you're streaming chunks into your program, and you've got multiple stages of work, where some must be serial and some can be parallel and some depend on each other, you might have a look at the Disruptor framework from LMAX.

Kind regards

自演自醉 2024-12-18 20:01:29

使用 ExecutorCompletionServiceFuture。它们共同提供了一个基于线程池的任务框架,可以解决您的所有问题。

如何检查数组中是否有空闲线程?我知道有一个函数 ThreadAlive,但每次在我的循环中轮询每个线程似乎效率非常低。

你不必这样做。执行器将以(超)高效的方式为您完成此操作。您只需向它提交任务并坐下来即可。

如果没有空闲线程,我如何才能等到一个线程结束?

再说一次,你真的不必这样做。这由执行者负责。

顺序很重要。如何保留线程输出的顺序?如图所示,输出的顺序需要与输入的顺序匹配。我如何保证这种同步?

这是一个问题。如果您希望处理后的输出(用您的话来说是块)以与这些块在初始数组中存在的顺序相同的顺序到达,则必须解决以下几点:

是否只是结果的到达顺序重要或者是任务处理本身依赖于顺序?如果是前者,那么很容易做到,但如果是后者,那就有问题了。 (考虑到您承认自己是 Java 新手,我认为这是非常困难的事情,因此我建议您在尝试此操作之前进行更多学习。)

假设是前一种情况,您可以做的是: 提交按某种顺序将块发送给执行器,每次提交都会为您提供任务处理输出的句柄(称为 Future)。将这些句柄存储在有序队列中,当您需要结果时,对这些 Future 调用 get()。请注意,如果订单中间的某些任务需要很长时间才能完成,那么后续任务的结果也会延迟。

我如何将块传递到我的线程?我可以只使用 Runnable 接口来执行此操作吗?

创建一个 Callable 实例,将每个块包装到实例中。这代表您将 submit()ExecutorService 的任务。

Use ExecutorCompletionService and Future<T>. Together they provide a threadpool based task framework that takes care of all your concerns.

How can I check whether or not there's a free thread in the array? I know that there is a function ThreadAlive, but it seems super inefficient to poll every single thread every time in my loop.

You dont have to. The executor will do this for you in an (super)efficient manner.You just have to submit tasks to it and sit back.

If there is no free thread, how can I wait until one has finished?

Again , you really dont have to. This is taken care of by executor.

The ordering is important. How can I preserve the ordering in which the threads output? As in, the order of the output needs to match the order of the input. How can I guarantee this synchronization?

This is a concern. If you want the processed output ( of chunks, in your words ) to arrive in the same order as these chunks are present in the initial array, you have to address a few points :

Is it just the order of arrival of the results that matter , or is it that the tasks processing themselves have dependencies on the order ? If it is the former , it is much easily done, but if its the later , then you have problems. ( which I think are very hard things to start with considering your admission of being new to Java, so I would just recommend more learning on your part before attempting this. )

Assuming it is the former case , what you can do is this : Submit the chunks to the executor in some order , and each submission will give you a handle ( called a Future<Result> ) to the task processed output. Store these handles in a ordered queue, and when you want the results , call the get() on these Future(s). Note that if some task in the middle of the order takes long time to complete , then the results of the following tasks will also be delayed.

How do I even pass the chunk to my thread? Can I just use the Runnable interface to do this?

Create a Callable instance wrapping one chunk each into the instance. This represents your task that you will submit() to the ExecutorService.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文