并发目录遍历算法挂起问题

发布于 2024-07-25 14:47:25 字数 6320 浏览 6 评论 0原文

我创建了一个并发、递归目录遍历和文件处理程序,该程序有时会在所有并行计算完成后挂起,但“主”线程永远不会继续执行其他任务。

该代码基本上是一个 fork-join 风格的并发聚合器,并行聚合完成后,它应该在 Swing 窗口中显示结果。 聚合的问题在于它需要生成一棵树并将叶节点的统计信息聚合到层次结构中。

我确信我犯了一个并发错误,但找不到它。 我在帖子末尾添加了代码的相关部分(为简洁起见,删除了代码注释,抱歉有 150 行,如果需要,我可以将其移动到外部位置)。

上下文:Java 6u13、Windows XP SP3、Core 2 Duo CPU。

我的问题是:

导致这种随机挂起的原因是什么?

是否有更好的方法来进行并发目录遍历,也许是以已经存在的形式现有的库?

Doug Lea(或 Java 7)的 fork-join 框架是否是一个更好的聚合/目录遍历框架,如果是这样,我应该如何在概念层面重​​新思考我的实现?

谢谢您的宝贵时间。

代码摘录:

private static JavaFileEvaluator[] processFiles(File[] files) 
throws InterruptedException {
    CountUpDown count = new CountUpDown();
    ThreadPoolExecutor ex = (ThreadPoolExecutor)Executors
    .newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    JavaFileEvaluator[] jfes = new JavaFileEvaluator[files.length];
    for (int i = 0; i < jfes.length; i++) {
        count.increment();
        jfes[i] = new JavaFileEvaluator(files[i], count, ex);
        ex.execute(jfes[i]);
    }
    count.await();
    for (int i = 0; i < jfes.length; i++) {
        count.increment();
        final JavaFileEvaluator jfe = jfes[i];
        ex.execute(new Runnable() {
            public void run() {
                jfe.aggregate();
            }
        });

    }
    // -------------------------------------
    // this await sometimes fails to wake up
    count.await(); // <---------------------
    // -------------------------------------
    ex.shutdown();
    ex.awaitTermination(0, TimeUnit.MILLISECONDS);
    return jfes;
}
public class JavaFileEvaluator implements Runnable {
    private final File srcFile;
    private final Counters counters = new Counters();
    private final CountUpDown count;
    private final ExecutorService service;
    private List<JavaFileEvaluator> children;
    public JavaFileEvaluator(File srcFile, 
            CountUpDown count, ExecutorService service) {
        this.srcFile = srcFile;
        this.count = count;
        this.service = service;
    }
    public void run() {
        try {
            if (srcFile.isFile()) {
                JavaSourceFactory jsf = new JavaSourceFactory();
                JavaParser jp = new JavaParser(jsf);
                try {
                    counters.add(Constants.FILE_SIZE, srcFile.length());
                    countLines();
                    jp.parse(srcFile);
                    Iterator<?> it = jsf.getJavaSources();
                    while (it.hasNext()) {
                        JavaSource js = (JavaSource)it.next();
                        js.toString();
                        processSource(js);
                    }
                // Some catch clauses here
                }
            } else
            if (srcFile.isDirectory()) {
                processDirectory(srcFile);
            }
        } finally {
            count.decrement();
        }
    }
    public void processSource(JavaSource js) {
        // process source, left out for brevity
    }
    public void processDirectory(File dir) {
        File[] files = dir.listFiles(new FileFilter() {
            @Override
            public boolean accept(File pathname) {
                return 
                (pathname.isDirectory() && !pathname.getName().startsWith("CVS") 
                 && !pathname.getName().startsWith("."))
                || (pathname.isFile() && pathname.getName().endsWith(".java") 
                 && pathname.canRead());
            }
        });
        if (files != null) {
            Arrays.sort(files, new Comparator<File>() {
                @Override
                public int compare(File o1, File o2) {
                    if (o1.isDirectory() && o2.isFile()) {
                        return -1;
                    } else
                    if (o1.isFile() && o2.isDirectory()) {
                        return 1;
                    }
                    return o1.getName().compareTo(o2.getName());
                }
            });
            for (File f : files) {
                if (f.isFile()) {
                    counters.add(Constants.FILE, 1);
                } else {
                    counters.add(Constants.DIR, 1);
                }
                JavaFileEvaluator ev = new JavaFileEvaluator(f, count, service);
                if (children == null) {
                    children = new ArrayList<JavaFileEvaluator>();
                }
                children.add(ev);
                count.increment();
                service.execute(ev);
            }
        }
    }
    public Counters getCounters() {
        return counters;
    }
    public boolean hasChildren() {
        return children != null && children.size() > 0;
    }
    public void aggregate() {
        // recursively aggregate non-leaf nodes
        if (!hasChildren()) {
            count.decrement();
            return;
        }
        for (final JavaFileEvaluator e : children) {
            count.increment();
            service.execute(new Runnable() {
                @Override
                public void run() {
                    e.aggregate();
                }
            });
        }
        count.decrement();
    }
}
public class CountUpDown {
    private final Lock lock = new ReentrantLock();
    private final Condition cond = lock.newCondition();
    private final AtomicInteger count = new AtomicInteger();
    public void increment() {
        count.incrementAndGet();
    }
    public void decrement() {
        int value = count.decrementAndGet();
        if (value == 0) {
            lock.lock();
            try {
                cond.signalAll();
            } finally {
                lock.unlock();
            }
        } else
        if (value < 0) {
            throw new IllegalStateException("Counter < 0 :" + value);
        }
    }
    public void await() throws InterruptedException {
        lock.lock();
        try {
            if (count.get() > 0) {
                cond.await();
            }
        } finally {
            lock.unlock();
        }
    }
}

编辑 在 JavaSourceEvaluator 中添加了 hasChildren() 方法。

I have created a concurrent, recursive directory traversal and file processing program, which sometimes hangs after all parallel computations have finished but the 'primary' thread never continues with other tasks.

The code is basically a fork-join style concurrent aggregator, and after the parallel aggregation completes, it should display the results in a Swing window. The trouble with the aggregation is that it needs to generate a tree and aggregate the statistics of the leaf nodes up in the hierarchy.

I'm sure I made a concurrency mistake but can't find it. I included the relevant part of my code at the end of the post (code comments removed for brevity, sorry for the 150 lines, If required, I could move it to an external location).

Context: Java 6u13, Windows XP SP3, Core 2 Duo CPU.

My questions are:

What could be the cause for this random hang?

Is there a better way of doing a concurrent directory traversal, perhaps in a form of an already existing library?

Would be the fork-join framework from Doug Lea (or Java 7) a better framework for the aggregation / directory traversal, and if so, how should I rethink my implementation - in the conceptional level?

Thank you for your time.

And the code excerpt:

private static JavaFileEvaluator[] processFiles(File[] files) 
throws InterruptedException {
    CountUpDown count = new CountUpDown();
    ThreadPoolExecutor ex = (ThreadPoolExecutor)Executors
    .newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    JavaFileEvaluator[] jfes = new JavaFileEvaluator[files.length];
    for (int i = 0; i < jfes.length; i++) {
        count.increment();
        jfes[i] = new JavaFileEvaluator(files[i], count, ex);
        ex.execute(jfes[i]);
    }
    count.await();
    for (int i = 0; i < jfes.length; i++) {
        count.increment();
        final JavaFileEvaluator jfe = jfes[i];
        ex.execute(new Runnable() {
            public void run() {
                jfe.aggregate();
            }
        });

    }
    // -------------------------------------
    // this await sometimes fails to wake up
    count.await(); // <---------------------
    // -------------------------------------
    ex.shutdown();
    ex.awaitTermination(0, TimeUnit.MILLISECONDS);
    return jfes;
}
public class JavaFileEvaluator implements Runnable {
    private final File srcFile;
    private final Counters counters = new Counters();
    private final CountUpDown count;
    private final ExecutorService service;
    private List<JavaFileEvaluator> children;
    public JavaFileEvaluator(File srcFile, 
            CountUpDown count, ExecutorService service) {
        this.srcFile = srcFile;
        this.count = count;
        this.service = service;
    }
    public void run() {
        try {
            if (srcFile.isFile()) {
                JavaSourceFactory jsf = new JavaSourceFactory();
                JavaParser jp = new JavaParser(jsf);
                try {
                    counters.add(Constants.FILE_SIZE, srcFile.length());
                    countLines();
                    jp.parse(srcFile);
                    Iterator<?> it = jsf.getJavaSources();
                    while (it.hasNext()) {
                        JavaSource js = (JavaSource)it.next();
                        js.toString();
                        processSource(js);
                    }
                // Some catch clauses here
                }
            } else
            if (srcFile.isDirectory()) {
                processDirectory(srcFile);
            }
        } finally {
            count.decrement();
        }
    }
    public void processSource(JavaSource js) {
        // process source, left out for brevity
    }
    public void processDirectory(File dir) {
        File[] files = dir.listFiles(new FileFilter() {
            @Override
            public boolean accept(File pathname) {
                return 
                (pathname.isDirectory() && !pathname.getName().startsWith("CVS") 
                 && !pathname.getName().startsWith("."))
                || (pathname.isFile() && pathname.getName().endsWith(".java") 
                 && pathname.canRead());
            }
        });
        if (files != null) {
            Arrays.sort(files, new Comparator<File>() {
                @Override
                public int compare(File o1, File o2) {
                    if (o1.isDirectory() && o2.isFile()) {
                        return -1;
                    } else
                    if (o1.isFile() && o2.isDirectory()) {
                        return 1;
                    }
                    return o1.getName().compareTo(o2.getName());
                }
            });
            for (File f : files) {
                if (f.isFile()) {
                    counters.add(Constants.FILE, 1);
                } else {
                    counters.add(Constants.DIR, 1);
                }
                JavaFileEvaluator ev = new JavaFileEvaluator(f, count, service);
                if (children == null) {
                    children = new ArrayList<JavaFileEvaluator>();
                }
                children.add(ev);
                count.increment();
                service.execute(ev);
            }
        }
    }
    public Counters getCounters() {
        return counters;
    }
    public boolean hasChildren() {
        return children != null && children.size() > 0;
    }
    public void aggregate() {
        // recursively aggregate non-leaf nodes
        if (!hasChildren()) {
            count.decrement();
            return;
        }
        for (final JavaFileEvaluator e : children) {
            count.increment();
            service.execute(new Runnable() {
                @Override
                public void run() {
                    e.aggregate();
                }
            });
        }
        count.decrement();
    }
}
public class CountUpDown {
    private final Lock lock = new ReentrantLock();
    private final Condition cond = lock.newCondition();
    private final AtomicInteger count = new AtomicInteger();
    public void increment() {
        count.incrementAndGet();
    }
    public void decrement() {
        int value = count.decrementAndGet();
        if (value == 0) {
            lock.lock();
            try {
                cond.signalAll();
            } finally {
                lock.unlock();
            }
        } else
        if (value < 0) {
            throw new IllegalStateException("Counter < 0 :" + value);
        }
    }
    public void await() throws InterruptedException {
        lock.lock();
        try {
            if (count.get() > 0) {
                cond.await();
            }
        } finally {
            lock.unlock();
        }
    }
}

Edit Added the hasChildren() method in JavaSourceEvaluator.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

回眸一遍 2024-08-01 14:47:25

在JavaFileEvaluator的聚合方法中,count.decrement()不会在finally块中调用。 如果聚合函数内部抛出任何 RuntimeException(可能是在 hasChildren 方法中,我看不到该方法的主体?),则对 decrement 的调用永远不会发生,并且 CountUpDown 将无限期地保持在等待状态。 这可能是您看到的随机挂起的原因。

对于第二个问题,我不知道java中有任何库可以做到这一点,但我还没有真正看过,很抱歉没有回答,但这不是我以前有机会使用的东西。

至于第三个问题,我认为无论您使用其他人提供的 fork-join 框架,还是继续提供自己的并发框架,最大的好处是将遍历目录的逻辑与涉及管理并行性的逻辑。 您提供的代码使用 CountUpDown 类来跟踪所有线程何时完成,最终您会在处理目录遍历的方法中调用增量/减量,这将导致追踪错误的噩梦。 迁移到 java7 fork-join 框架将迫使您创建一个仅处理实际遍历逻辑的类,并将并发逻辑留给框架,这可能是一个好方法。 另一种选择是继续使用现有的内容,但在管理逻辑和工作逻辑之间进行清晰的划分,这将帮助您跟踪并修复此类错误。

In the aggregate method of JavaFileEvaluator, count.decrement() is not called in a finally block. If any RuntimeExceptions are thrown inside the aggregate function (possibly in the hasChildren method, which I don't see the body of?), the call to decrement will never happen and CountUpDown will stay in await indefinitely. This may be the cause of the random hang you are seeing.

For the second question, I don't know of any libraries in java for doing this, but I haven't really looked, sorry for the non-answer but this isn't something I've had any opportunity to use before.

As far as the third question goes, I think whether you use a fork-join framework provided by someone else, or continue providing your own concurrency framework, the biggest gain would be in separating the logic that does the work of traversing the directories from the logic involved with managing the parrallelism. The code you provided uses the CountUpDown class to keep track of when all threads are finished, and you end up with calls to increment/decrement sprinkled throughout the methods dealing with the directory traversing, which will lead to nightmares tracking down bugs. Moving to the java7 fork-join framework will force you to create a class that only deals with the actual traversal logic, and leave the concurrency logic up to the framework, which may be a good way for you to go. The other option is to keep going with what you have here, but make a clear delineation between the management logic and the work logic, which would help you track down and fix these sorts of bugs.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文