Java线程这样的好做法吗?

发布于 2024-09-11 00:59:55 字数 3252 浏览 7 评论 0原文

这是一篇很长的文章,所以我必须感谢您的阅读。 我的应用程序应该处理大量声音文件,比如说 4000+。我的第一个方法是加载一定量(比如说 200mb)的声音数据,处理它,写入它,然后“清空”数据以让 gc 释放它。但考虑到数据是通过内网加载的,这似乎不是“最好”的方式。 (文件访问速度慢)应从第一个加载的文件开始计算。为了实现这一目标,我将概念更改为某种“生产者/消费者”(我认为)。到目前为止,这是我的类:

Reader/Producer

public class ReaderThread extends Thread {

    List<Long> files;
    ConcurrentLinkedQueue<Long> loaded = new ConcurrentLinkedQueue<Long>();
    boolean finished = false;

    public ReaderThread( List<Long> soundFiles) {
        this.files = soundFiles;
    }

    @Override
    public void run() {
        Iterator<Long> it = files.iterator();
        while(it.hasNext()) {
            Long id = it.next();
            if (FileLoader.load(id)) {
                loaded.add(id);
            }
        }
        finished = true;
    }

    public Long getNextId() {
        while(loaded.isEmpty()) {
            if( finished ) {
                return null;
            }
        }
        Long id = loaded.poll();
        return id;
    }
}

这是编写者/(不是消费者)

public class WriterThread extends Thread {

    ConcurrentLinkedQueue<Long> loaded = new ConcurrentLinkedQueue<Long>();
    String directory;
    boolean abort = false;

    public WriterThread(String directory) {
        this.directory = directory;
    }

    @Override
    public void run() {

        while(!(abort&&loaded.isEmpty())) {
            if(!loaded.isEmpty()) {
                Long id = loaded.poll();
                FileWriter.write(id, directory);
                FileManager.unload(id);
            }
        }
    }

    public synchronized void submit(Long id) {
        loaded.add(id);
    }

    public synchronized void halt() {
        abort = true;
    }

}

这是所有事物聚集在一起的部分:

    // Forgive me the "t" and "w". ;-)
                    t = new ReaderThread(soundSystem,soundfilesToProcess);
            w = new WriterThread(soundSystem,outputDirectory );

            t.start();
            w.start();

            long start = System.currentTimeMillis();
            while(!abort) {

                Long id = t.getNextId();
                if(id!=null) {

                    SoundFile soundFile = soundSystem.getSoundfile(id);
                    ProcessorChain pc = new ProcessorChain(soundFile, getProcessingChain(), w);
                        Future<List<ProcessorResult>> result = es.submit(pc);
                        results.add(result);

                }else {
                    break;
                }
            }
for( Future<List<ProcessorResult>> result : results) {
            List<ProcessorResult> tempResults;
            try {
                tempResults = result.get();
                processResults(tempResults);

            } catch (InterruptedException e) {

                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }

        }   
w.halt();

“ProcessorChain”是一个可运行的。 es.提交-> “es”是一个CachedThreadPool。

我首先需要知道的是天气或这种方法是否“好”,或者是否更像“废话”。它似乎工作得很好,但我对编写器线程没有什么问题,似乎在某些情况下并非所有文件都被写入。当 ProcessorChain 完成工作时,将调用编写器线程的提交方法。 第二件事是线程安全。我错过了什么吗?

this is some kind of long post, so I have to say thanks for reading.
My app is supposed to process a lot of soundfiles, lets say 4000+. My first approach was to load a certain amount (lets say 200mb) of sound data, process it, write it and then "null" the data to let the gc free it. But regarting to the fact that the data is loaded via intranet, this seems not to be the "best" way. (File access is slow) Calculations should start with the first loaded file. To achive this, I changed the concept to a sort of "producer/consumer" (I think). Here are my classes so far:

Reader/Producer

public class ReaderThread extends Thread {

    List<Long> files;
    ConcurrentLinkedQueue<Long> loaded = new ConcurrentLinkedQueue<Long>();
    boolean finished = false;

    public ReaderThread( List<Long> soundFiles) {
        this.files = soundFiles;
    }

    @Override
    public void run() {
        Iterator<Long> it = files.iterator();
        while(it.hasNext()) {
            Long id = it.next();
            if (FileLoader.load(id)) {
                loaded.add(id);
            }
        }
        finished = true;
    }

    public Long getNextId() {
        while(loaded.isEmpty()) {
            if( finished ) {
                return null;
            }
        }
        Long id = loaded.poll();
        return id;
    }
}

This is the writer/(Not consumer)

public class WriterThread extends Thread {

    ConcurrentLinkedQueue<Long> loaded = new ConcurrentLinkedQueue<Long>();
    String directory;
    boolean abort = false;

    public WriterThread(String directory) {
        this.directory = directory;
    }

    @Override
    public void run() {

        while(!(abort&&loaded.isEmpty())) {
            if(!loaded.isEmpty()) {
                Long id = loaded.poll();
                FileWriter.write(id, directory);
                FileManager.unload(id);
            }
        }
    }

    public synchronized void submit(Long id) {
        loaded.add(id);
    }

    public synchronized void halt() {
        abort = true;
    }

}

This is the part where all things get together:

    // Forgive me the "t" and "w". ;-)
                    t = new ReaderThread(soundSystem,soundfilesToProcess);
            w = new WriterThread(soundSystem,outputDirectory );

            t.start();
            w.start();

            long start = System.currentTimeMillis();
            while(!abort) {

                Long id = t.getNextId();
                if(id!=null) {

                    SoundFile soundFile = soundSystem.getSoundfile(id);
                    ProcessorChain pc = new ProcessorChain(soundFile, getProcessingChain(), w);
                        Future<List<ProcessorResult>> result = es.submit(pc);
                        results.add(result);

                }else {
                    break;
                }
            }
for( Future<List<ProcessorResult>> result : results) {
            List<ProcessorResult> tempResults;
            try {
                tempResults = result.get();
                processResults(tempResults);

            } catch (InterruptedException e) {

                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }

        }   
w.halt();

"ProcessorChain" is a runnable.
es.submit -> "es" is a CachedThreadPool.

What I need to know in the first place is weather or not this approach is "good", or if it is more like "nonsens". It seems to work quite well, but I have little problems with the writer thread, it seems that in some cases not all files get written. The writer threads submit method is called by the ProcessorChain when it has finished work.
The second thing it thread safety. Did I miss somethin?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

话少情深 2024-09-18 00:59:55

我相信,如果每个线程读取、处理然后写入整个声音文件(每个文件一个线程),事情会(很多)简单。

您可以使用 Java 线程池并让操作系统/Java VM 并行读取/处理/写入多个文件以提高效率。我可能是错的,但根据您所描述的,一个更简单的解决方案就足够了,然后您可以衡量您的瓶颈(如果需要进一步改进)。

I believe it will be (a lot) simpler if each thread reads, process and then writes a whole soundfile (one thread per file).

You use can Java thread pools and let the operating system/Java VM parallelize the read/process/write with multiple files to gain eficiency. I may be wrong, but from what you described a simpler solution would be enough and then you can measure your bottleneck if further improvements are needed.

梅倚清风 2024-09-18 00:59:55

我认为这种方法总体来说是可以的(一个线程用于读取输入,一个线程用于写入输出,一个或多个线程用于处理)。

一些建议:

1 - 您可能想要使用信号量,而不是让线程在恒定循环中旋转。例如,使用信号量,您的写入线程只会阻塞,直到文件实际可写入为止。目前它会旋转,当没有什么可写时,可能会浪费 1/3 的 cpu 周期。

2 - 您可能想要显式创建工作线程而不是在主线程上执行工作。这样你就可以有多个线程同时进行处理。
这可能已经是 ProcessorChain 正在做的事情,但从代码片段中并不清楚。

I think the approach is ok in general (one thread for reading input, one for writing output and one or more for processing).

A couple of suggestions:

1 - you probably want to use semaphores instead of having your threads spinning in a constant loop. For example, with a semaphore your write thread would just block until a file was actually available to write. Currently it will spin, potentially wasting 1/3 of your cpu cycles when there's nothing to write.

2 - you probably want to explicitly create worker threads instead of doing the work on the main thread. That way you can have multiple threads doing processing at the same time.
That may already be what ProcessorChain is doing, but it's not clear from the snippet.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文