并行合并排序时出现内存不足错误

发布于 2024-11-05 08:54:21 字数 4870 浏览 0 评论 0原文

我尝试并行化我的合并排序实现:http://pastebin.com/2uMGjTxr。 我想创建 Java-VM 可以提供的尽可能多的线程。我想使用java.lang.Runtime确定可能的最大线程数。

所以我想出了一个名为 MergeThread 的类:

public class MergeThread implements Runnable{

    public int[] list;
    int sIndex, eIndex;

    public MergeThread(int[] pArray, int pStartIndex, int pEndIndex){
        list = pArray;
        sIndex = pStartIndex;
        eIndex = pEndIndex;
    }

    public void run(){
        list = mergeSort(list, sIndex, eIndex);
    }

    /**
     * Merges two sorted int array into one new sorted array.
     * @param lhs
     * @param rhs
     * @return
     */
    private static int[] merge(int[] lhs, int[] rhs) {
        int[] result = new int[lhs.length + rhs.length];

        int leftIndex = 0;
        int rightIndex = 0;
        while(leftIndex < lhs.length && rightIndex < rhs.length) {
            if(lhs[leftIndex] <= rhs[rightIndex]) {
                result[leftIndex + rightIndex] = lhs[leftIndex];
                leftIndex++;
            } else {
                result[leftIndex + rightIndex] = rhs[rightIndex];
                rightIndex++;
            }
        }

        while(leftIndex < lhs.length) {
            result[leftIndex + rightIndex] = lhs[leftIndex];
            leftIndex++;
        }

        while(rightIndex < rhs.length) {
            result[leftIndex + rightIndex] = rhs[rightIndex];
            rightIndex++;
        }

        return result;
    }

    /**
     * Sorts an array from index <code>startIndex</code> (inclusive) to <code>endIndex</code> (exclusive).
     * @param array
     * @param startIndex
     * @param endIndex
     * @return new array that is sorted
     */
    private static int[] mergeSort(int[] array, int startIndex, int endIndex) {
        int length = endIndex - startIndex;
        if(length == 0) {
            return new int[]{};
        }
        if(length == 1) {
            return new int[]{array[startIndex]};
        }

        int halfLength = length / 2;
        //int[] sortedLeftPart = mergeSort(array, startIndex, startIndex + halfLength);
        MergeThread m1 = new MergeThread(array, startIndex, startIndex + halfLength);
        Thread t1 = new Thread(m1);
        t1.start();
        //int[] sortedRightPart = mergeSort(array, startIndex + halfLength, endIndex);
        MergeThread m2 = new MergeThread(array, startIndex + halfLength, endIndex);
        Thread t2 = new Thread(m2);
        t2.start();
        try{
        t1.join();
        t2.join();
        }catch(InterruptedException e){}
        return merge(m1.list, m2.list);     
    }
}

以及一个实际启动进程的类

import java.util.Random;

public class Aufg2 {
    public static Random random = new Random(100);

    public static void main(String[] args) {
        int[] array = createRandomArray(10000000);

        long time = System.currentTimeMillis();

        int[] sortedArray = sort(array);

        if(sortedArray.length != array.length || !isSorted(sortedArray)) {
            System.err.println("Failed to sort given array! :-(");
            return;
        }       
        System.out.println("Success! Sorting took " + (System.currentTimeMillis() - time) + "ms.");     
    }

    /**
     * Creates a randomly filled array of given length
     * @param length
     * @return
     */
    private static int[] createRandomArray(int length) {
        int[] result = new int[length];
        for(int i = 0; i < length; i++) {
            result[i] = random.nextInt();
        }
        return result;
    }

    /**
     * Checks whether a given int array is sorted in ascending order  
     * @param array
     * @return <code>true</code> if the given int array is sorted; <code>false</code> otherwise.
     */
    private static boolean isSorted(int[] array) {
        for(int i = 1; i < array.length; i++) {
            if(array[i] < array[i-1]) {
                return false;
            }
        }
        return true;
    }   

    /**
     * Sorts a given array (ascending order)
     * @param array
     * @return
     */
    private static int[] sort(int[] array){
        //TODO: use multiple threads to speed up the sorting
        MergeThread m = new MergeThread(array, 0, array.length);

        try{

        Thread t1 = new Thread(m);
        t1.start();
        t1.join();
        }catch(InterruptedException e){

        }
        return m.list;
    }
}

但是这种合并排序不起作用。控制台打印出很多java.lang.OutOfMemmoryError's无法创建新的本机线程

后来消息更改为类似 java heap 的内容。

我需要更改什么才能使合并排序正常工作以及如何使用 java.lang.Runtime 来实现这一点?

I try to parallelize my merge sort implementation: http://pastebin.com/2uMGjTxr.
I want to create as many threads as Java-VM can provide. I want to determine the maximum number of possible threads using java.lang.Runtime.

So I came up with a class called MergeThread:

public class MergeThread implements Runnable{

    public int[] list;
    int sIndex, eIndex;

    public MergeThread(int[] pArray, int pStartIndex, int pEndIndex){
        list = pArray;
        sIndex = pStartIndex;
        eIndex = pEndIndex;
    }

    public void run(){
        list = mergeSort(list, sIndex, eIndex);
    }

    /**
     * Merges two sorted int array into one new sorted array.
     * @param lhs
     * @param rhs
     * @return
     */
    private static int[] merge(int[] lhs, int[] rhs) {
        int[] result = new int[lhs.length + rhs.length];

        int leftIndex = 0;
        int rightIndex = 0;
        while(leftIndex < lhs.length && rightIndex < rhs.length) {
            if(lhs[leftIndex] <= rhs[rightIndex]) {
                result[leftIndex + rightIndex] = lhs[leftIndex];
                leftIndex++;
            } else {
                result[leftIndex + rightIndex] = rhs[rightIndex];
                rightIndex++;
            }
        }

        while(leftIndex < lhs.length) {
            result[leftIndex + rightIndex] = lhs[leftIndex];
            leftIndex++;
        }

        while(rightIndex < rhs.length) {
            result[leftIndex + rightIndex] = rhs[rightIndex];
            rightIndex++;
        }

        return result;
    }

    /**
     * Sorts an array from index <code>startIndex</code> (inclusive) to <code>endIndex</code> (exclusive).
     * @param array
     * @param startIndex
     * @param endIndex
     * @return new array that is sorted
     */
    private static int[] mergeSort(int[] array, int startIndex, int endIndex) {
        int length = endIndex - startIndex;
        if(length == 0) {
            return new int[]{};
        }
        if(length == 1) {
            return new int[]{array[startIndex]};
        }

        int halfLength = length / 2;
        //int[] sortedLeftPart = mergeSort(array, startIndex, startIndex + halfLength);
        MergeThread m1 = new MergeThread(array, startIndex, startIndex + halfLength);
        Thread t1 = new Thread(m1);
        t1.start();
        //int[] sortedRightPart = mergeSort(array, startIndex + halfLength, endIndex);
        MergeThread m2 = new MergeThread(array, startIndex + halfLength, endIndex);
        Thread t2 = new Thread(m2);
        t2.start();
        try{
        t1.join();
        t2.join();
        }catch(InterruptedException e){}
        return merge(m1.list, m2.list);     
    }
}

And a class that actually starts the process

import java.util.Random;

public class Aufg2 {
    public static Random random = new Random(100);

    public static void main(String[] args) {
        int[] array = createRandomArray(10000000);

        long time = System.currentTimeMillis();

        int[] sortedArray = sort(array);

        if(sortedArray.length != array.length || !isSorted(sortedArray)) {
            System.err.println("Failed to sort given array! :-(");
            return;
        }       
        System.out.println("Success! Sorting took " + (System.currentTimeMillis() - time) + "ms.");     
    }

    /**
     * Creates a randomly filled array of given length
     * @param length
     * @return
     */
    private static int[] createRandomArray(int length) {
        int[] result = new int[length];
        for(int i = 0; i < length; i++) {
            result[i] = random.nextInt();
        }
        return result;
    }

    /**
     * Checks whether a given int array is sorted in ascending order  
     * @param array
     * @return <code>true</code> if the given int array is sorted; <code>false</code> otherwise.
     */
    private static boolean isSorted(int[] array) {
        for(int i = 1; i < array.length; i++) {
            if(array[i] < array[i-1]) {
                return false;
            }
        }
        return true;
    }   

    /**
     * Sorts a given array (ascending order)
     * @param array
     * @return
     */
    private static int[] sort(int[] array){
        //TODO: use multiple threads to speed up the sorting
        MergeThread m = new MergeThread(array, 0, array.length);

        try{

        Thread t1 = new Thread(m);
        t1.start();
        t1.join();
        }catch(InterruptedException e){

        }
        return m.list;
    }
}

However this merge sort does not work. The console prints a lot of java.lang.OutOfMemmoryError's unable to create new native thread.

Later the message changes to something like java heap.

What do I have to change to get the merge sort working and how do I use java.lang.Runtime for that?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

巡山小妖精 2024-11-12 08:54:21

分而治之的机制让你尝试创建类似 5000000 个线程的东西 - 并且每个线程都需要默认的 256KB (IIRC) 堆栈内存。仍然对为什么会收到 OutOfMemmoryError 感到惊讶吗?

使用 固定大小线程池 - 对池中的线程数量进行一些实验,但是任何超过系统中核心数量的东西都不太可能提高性能(并且确实可能​​会降低性能)。

The divide and conquer mechanism has you trying to create something like 5000000 threads - and each of those wants as default 256KB (IIRC) of stack memory. Still surprised why you get an OutOfMemmoryError?

Limit the number of threads by using a fixed size thread pool - experiment a bit with the number of threads in the pool, but anything much more than the number of cores in your system is very unlikely to improve performance (and may indeed reduce it).

风苍溪 2024-11-12 08:54:21

首先,使用 ExecutorService 并在其中排队新任务,而不是创建数百万个线程(这应该消除第一个问题;如果创建数百万个线程,迟早会耗尽资源)。 1.5 倍的核心数量通常是一个不错的猜测(通常比使用可用核心数量给出更好的结果 - 但这是您必须考虑的事情)。

然后 - 如果您希望该算法在任何地方都具有高性能,那么绝对重要 - 在合理的阈值下对叶情况使用快速排序,或者如果您想要较低的阈值则使用插入排序(如果您使用插入排序,叶节点大小应该为 16 左右)工作正常)。

First of all use an ExecutorService and queue new tasks in it instead of creating millions of threads (which should get rid of the first problem; you run out of resources sooner or later if you create millions of threads). 1.5times the number of cores is usually a good guess (often gives better results than using the available number of cores - but that's something you have to play with).

And then - absolutely important if you want this algorithm to be anywhere performant - use a QuickSort for the leaf case at a reasonable threshold, or a InsertionSort if you want a lower threshold (if you use Insertion Sort a leafnode size of 16 or so should work fine).

悲凉≈ 2024-11-12 08:54:21

让一个线程处理数组的后半部分,而调用线程处理前半部分,

    int halfLength = length / 2;
    MergeThread m2 = new MergeThread(array, startIndex + halfLength, endIndex);
    Thread t2 = new Thread(m2);
    t2.start();//let new thread handle the second half
    array = mergeSort(array, startIndex, startIndex + halfLength);//do first half ourselves
    try{
    t2.join();
    }catch(InterruptedException e){}
    return merge(array, m2.list);

这会将创建的线程数量减少一半

,但快速排序更适合并行化,因为它不需要后递归步骤它允许线程(带有执行器的可运行作业)在委托

调用者之后立即返回,然后只需要监视所有作业何时完成

let one thread do the second half of the array while the calling thread handles the first half

    int halfLength = length / 2;
    MergeThread m2 = new MergeThread(array, startIndex + halfLength, endIndex);
    Thread t2 = new Thread(m2);
    t2.start();//let new thread handle the second half
    array = mergeSort(array, startIndex, startIndex + halfLength);//do first half ourselves
    try{
    t2.join();
    }catch(InterruptedException e){}
    return merge(array, m2.list);

this lessens the amount of threads created down by half of what you had

but quicksort is much better to parallelize given that it doesn't need a post recursion step which allows the thread (runnable job with excecutors) to return immediately after delegating

the caller then only needs to watch for when all jobs are done

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文