Java中如何进行线程限制

发布于 2024-12-23 14:10:06 字数 357 浏览 2 评论 0原文

假设我有 1000 个文件要读取,并且由于某些限制,我想并行读取最多 5 个文件。而且,一旦其中一个完成,我就想要一个新的开始。

我有一个主函数,它有文件列表,并且每当一个线程完成时我都会尝试更改计数器。但它不起作用!

有什么建议吗?

以下是main函数循环

for (final File filename : folder.listFiles()) {

    Object lock1 = new Object();
    new myThread(filename, lock1).start();
    counter++;
    while (counter > 5);
}

Let's say I have 1000 files to read and because of some limits, I want to read maximum 5 files in parallel. And, as soon as one of them is finished, I want a new one starts.

I have a main function who have the list of the files and I try changing a counter whenever one thread is finished. but it doesn't works!

Any suggestion?

The following is the main function loop

for (final File filename : folder.listFiles()) {

    Object lock1 = new Object();
    new myThread(filename, lock1).start();
    counter++;
    while (counter > 5);
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

北风几吹夏 2024-12-30 14:10:06

像这样生成线程并不是正确的方法。使用 ExecutorService 并将池指定为 5。将所有文件放入 BlockingQueue 或其他线程安全集合之类的东西中,所有正在执行的文件都可以poll() 随意。

public class ThreadReader {

    public static void main(String[] args) {
        File f = null;//folder
        final BlockingQueue<File> queue = new ArrayBlockingQueue<File>(1000);
        for(File kid : f.listFiles()){
            queue.add(kid);
        }

        ExecutorService pool = Executors.newFixedThreadPool(5);

        for(int i = 1; i <= 5; i++){
            Runnable r = new Runnable(){
                public void run() {
                    File workFile = null;
                    while((workFile = queue.poll()) != null){
                        //work on the file.
                    }
                }
            };
            pool.execute(r);
        }
    }
}

Spawning threads like this is not the way to go. Use an ExecutorService and specify the pool to be 5. Put all the files in something like a BlockingQueue or another thread-safe collection and all the executing ones can just poll() it at will.

public class ThreadReader {

    public static void main(String[] args) {
        File f = null;//folder
        final BlockingQueue<File> queue = new ArrayBlockingQueue<File>(1000);
        for(File kid : f.listFiles()){
            queue.add(kid);
        }

        ExecutorService pool = Executors.newFixedThreadPool(5);

        for(int i = 1; i <= 5; i++){
            Runnable r = new Runnable(){
                public void run() {
                    File workFile = null;
                    while((workFile = queue.poll()) != null){
                        //work on the file.
                    }
                }
            };
            pool.execute(r);
        }
    }
}
红玫瑰 2024-12-30 14:10:06

您可以使用 ExecutorService 作为线程池和队列。

ExecutorService pool = Executors.newFixedThreadPool(5);
File f = new File(args[0]);

for (final File kid : f.listFiles()) {
    pool.execute(new Runnable() {
        @Override
        public void run() {
            process(kid);
        }
    });
}
pool.shutdown();
// wait for them to finish for up to one minute.
pool.awaitTermination(1, TimeUnit.MINUTES);

You can use an ExecutorService as a thread pool AND a queue.

ExecutorService pool = Executors.newFixedThreadPool(5);
File f = new File(args[0]);

for (final File kid : f.listFiles()) {
    pool.execute(new Runnable() {
        @Override
        public void run() {
            process(kid);
        }
    });
}
pool.shutdown();
// wait for them to finish for up to one minute.
pool.awaitTermination(1, TimeUnit.MINUTES);
所有深爱都是秘密 2024-12-30 14:10:06

凯拉的回答中的方法是正确的。使用 Java 类库提供的执行器类,而不是从头开始自己实现线程池(很糟糕)。


但我认为讨论您问题中的代码以及为什么它不起作用可能会有用。 (我已经尽力填写了您遗漏的一些部分...)

public class MyThread extends Thread {

    private static int counter;

    public MyThread(String fileName, Object lock) {
        // Save parameters in instance variables
    }

    public void run() {
        // Do stuff with instance variables
        counter--;
    }

    public static void main(String[] args) {
        // ...
        for (final File filename : folder.listFiles()) {
            Object lock1 = new Object();
            new MyThread(filename, lock1).start();
            counter++;
            while (counter > 5);
        }
        // ...
    }
}

好吧,那么这有什么问题呢?为什么不起作用?

第一个问题是,在 main 中,您正在读取和写入 counter 而不进行任何同步。我假设它也由工作线程更新 - 否则代码没有任何意义。因此,这意味着主线程很可能看不到子线程所做的更新的结果。换句话说,while (counter > 5);可能是一个无限循环。 (事实上​​,这很有可能。JIT 编译器可以生成这样的代码,其中 counter > 5 只是测试在前面的 counter++; 语句。

第二个问题是您的 while (counter > 5); 循环非常浪费资源。 ..并且它可能会执行数十亿次第二...全力运行一个处理器(核心) 如果您打算使用低级原语实现此类内容,则应该使用 Java 的 Object.wait() 和 Object.notify() 方法;例如主线程等待,每个工作线程通知。

The approach in Kylar's answer is the correct one. Use the executor classes provided by the Java class libraries rather than implementing thread pooling yourself from scratch (badly).


But I thought it might be useful to discuss the code in your question and why it doesn't work. (I've filled in some of the parts that you left out as best I can ...)

public class MyThread extends Thread {

    private static int counter;

    public MyThread(String fileName, Object lock) {
        // Save parameters in instance variables
    }

    public void run() {
        // Do stuff with instance variables
        counter--;
    }

    public static void main(String[] args) {
        // ...
        for (final File filename : folder.listFiles()) {
            Object lock1 = new Object();
            new MyThread(filename, lock1).start();
            counter++;
            while (counter > 5);
        }
        // ...
    }
}

OK, so what is wrong with this? Why doesn't it work?

Well the first problem is that in main you are reading and writing counter without doing any synchronization. I assume that it is also being updated by the worker threads - the code makes no sense otherwise. So that means that there is a good chance that the main threads won't see the result of the updates made by the child threads. In other words, while (counter > 5); could be an infinite loop. (In fact, this is pretty likely. The JIT compiler is allowed to generate code in which the counter > 5 simply tests the value of counter left in a register after the previous counter++; statement.

The second problem is that your while (counter > 5); loop is incredibly wasteful of resources. You are telling the JVM to poll a variable ... and it will do this potentially BILLIONS of times a second ... running one processor (core) flat out. You shouldn't do that. If you are going to implement this kind of stuff using low-level primitives, you should use Java's Object.wait() and Object.notify() methods; e.g. the main thread waits, and each worker thread notifies.

岁月苍老的讽刺 2024-12-30 14:10:06

无论您使用什么方法来创建新线程,递增全局计数器,在线程创建周围添加条件语句,如果达到限制,则不要创建新线程,也许将文件推送到队列(一个列表) ?),然后您可以在创建线程后添加另一个条件语句,如果队列中有项目,则首先处理这些项目。

Whatever method you are using to create a new Thread, increment a global counter, add a conditional statement around the thread creation that if the limit has been reached then don't create a new thread, maybe push the files onto a queue (a list?) and then you could add another conditional statement, after a thread is created, if there are items in the queue, to process those items first.

逆光下的微笑 2024-12-30 14:10:06

我们也可以使用信号量类。
因此 Semaphore 类将限制代码块上一次执行的线程数量。它有像 aquire()、release() 这样的方法来获取和释放锁这种机制

we can Semaphore class as well.
so Semaphore class will limit the number of threads executing at a time on block of code. It has methods like aquire(), release() to acquire and release the lock kind of mechanism

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文