如何阻塞直到异步作业完成

发布于 2024-07-09 05:11:27 字数 1497 浏览 10 评论 0原文

我正在开发一个 C# 库,该库使用 NVIDIA 的 CUDA 将某些工作任务卸载到 GPU。 一个示例是使用扩展方法将两个数组添加在一起:

float[] a = new float[]{ ... }
float[] b = new float[]{ ... }
float[] c = a.Add(b);

此代码中的工作是在 GPU 上完成的。 但是,我希望它能够异步完成,这样只有当需要结果时,代码才会在 CPU 块上运行(如果结果尚未在 GPU 上完成)。 为此,我创建了一个隐藏异步执行的 ExecutionResult 类。 在使用中,如下所示:

float[] a = new float[]{ ... }
float[] b = new float[]{ ... }
ExecutionResult res = a.Add(b);
float[] c = res; //Implicit converter

在最后一行,如果数据尚未准备好,则程序将阻塞。 我不确定在 ExecutionResult 类中实现这种阻塞行为的最佳方法,因为我对同步线程和此类事情不太有经验。

public class ExecutionResult<T>
{
    private T[] result;
    private long computed = 0;

    internal ExecutionResult(T[] a, T[] b, Action<T[], T[], Action<T[]>> f)
    {
        f(a, b, UpdateData); //Asych call - 'UpdateData' is the callback method
    }

    internal void UpdateData(T[] data)
    {
        if (Interlocked.Read(ref computed) == 0)
        {
            result = data;
            Interlocked.Exchange(ref computed, 1);
        }
    }

    public static implicit operator T[](ExecutionResult<T> r)
    {
        //This is obviously a stupid way to do it
        while (Interlocked.Read(ref r.computed) == 0)
        {
            Thread.Sleep(1);
        }

        return result;
    }
}

传递给构造函数的 Action 是一个异步方法,它在 GPU 上执行实际工作。 嵌套的Action就是异步回调方法。

我主要关心的是如何最好/最优雅地处理转换器中完成的等待,以及是否有更合适的方法来解决整个问题。 如果有什么我需要详细说明或进一步解释的,请发表评论。

I'm working on a C# library which offloads certain work tasks to the GPU using NVIDIA's CUDA. An example of this is adding two arrays together using extension methods:

float[] a = new float[]{ ... }
float[] b = new float[]{ ... }
float[] c = a.Add(b);

The work in this code is done on the GPU. However, I would like it to be done asynchronously such that only when the result is needed will the code running on the CPU block (if the result is not finished on the GPU yet). To do this I've created an ExecutionResult class which hides the asynchronous execution. In use this looks as follows:

float[] a = new float[]{ ... }
float[] b = new float[]{ ... }
ExecutionResult res = a.Add(b);
float[] c = res; //Implicit converter

At the last line the program blocks if the data is done ready yet. I'm not certain of the best way to implement this blocking behavior inside the ExecutionResult class as I'm not very experienced with synchronizing threads and those sorts of things.

public class ExecutionResult<T>
{
    private T[] result;
    private long computed = 0;

    internal ExecutionResult(T[] a, T[] b, Action<T[], T[], Action<T[]>> f)
    {
        f(a, b, UpdateData); //Asych call - 'UpdateData' is the callback method
    }

    internal void UpdateData(T[] data)
    {
        if (Interlocked.Read(ref computed) == 0)
        {
            result = data;
            Interlocked.Exchange(ref computed, 1);
        }
    }

    public static implicit operator T[](ExecutionResult<T> r)
    {
        //This is obviously a stupid way to do it
        while (Interlocked.Read(ref r.computed) == 0)
        {
            Thread.Sleep(1);
        }

        return result;
    }
}

The Action passed to the constructor is an asynchronous method which performs the actual work on the GPU. The nested Action is the asynchronous callback method.

My main concern is how to best/most elegantly handle the waiting done in the converter but also if there are more appropriate ways to attack the problem as a whole. Just leave a comment if there is something I need to elaborate or explain further.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

万劫不复 2024-07-16 05:11:27

我不清楚您正在实现多少这是一个框架以及您调用了多少其他代码,但我会遵循 .NET 中的“正常”异步模式尽可能。

It's not clear to me how much this is a framework you're implementing and how much you're calling into other code, but I would follow the "normal" async pattern in .NET as far as possible.

煞人兵器 2024-07-16 05:11:27

我发现该问题的解决方案是将一个函数传递给 ExecutionResult 构造函数,该构造函数执行两件事。 运行时,它启动异步工作,此外,它还返回另一个返回所需结果的函数:

private Func<T[]> getResult;

internal ExecutionResult(T[] a, T[] b, Func<T[], T[], Func<T[]>> asynchBinaryFunction)
{
   getResult = asynchUnaryFunction(a);
}

public static implicit operator T[](ExecutionResult<T> r)
{
    return r.getResult();
}

“getResult”函数会阻塞,直到计算完数据并从 GPU 获取数据为止。 这非常适合 CUDA 驱动程序 API 的结构。

这是一个非常干净和简单的解决方案。 由于 C# 允许通过访问本地作用域来创建匿名函数,因此只需替换传递给 ExecutionResult 构造函数的方法的阻塞部分,这样......

    ...

    status = LaunchGrid(func, length);

    //Fetch result
    float[] c = new float[length];
    status = CUDADriver.cuMemcpyDtoH(c, ptrA, byteSize);
    status = Free(ptrA, ptrB);

    return c;
}

就变成......

    ...

    status = LaunchGrid(func, length);

    return delegate
    {
        float[] c = new float[length];
        CUDADriver.cuMemcpyDtoH(c, ptrA, byteSize); //Blocks until work is done
        Free(ptrA, ptrB);
        return c;
    };
}

The solution I found to the problem is to pass a function to the ExecutionResult constructor which does two things. When run, it starts the asynchronous work and in addition, it returns another function which returns the desired result:

private Func<T[]> getResult;

internal ExecutionResult(T[] a, T[] b, Func<T[], T[], Func<T[]>> asynchBinaryFunction)
{
   getResult = asynchUnaryFunction(a);
}

public static implicit operator T[](ExecutionResult<T> r)
{
    return r.getResult();
}

The 'getResult' function blocks until the data has been calculated and fetched from the GPU. This works well with how the CUDA driver API is structured.

It is a quite clean and simple solution. Since C# allows anonymous functions to be created with access to the local scope it is simply a matter of replacing the blocking part of a method passed to the ExecutionResult constructor such that...

    ...

    status = LaunchGrid(func, length);

    //Fetch result
    float[] c = new float[length];
    status = CUDADriver.cuMemcpyDtoH(c, ptrA, byteSize);
    status = Free(ptrA, ptrB);

    return c;
}

becomes...

    ...

    status = LaunchGrid(func, length);

    return delegate
    {
        float[] c = new float[length];
        CUDADriver.cuMemcpyDtoH(c, ptrA, byteSize); //Blocks until work is done
        Free(ptrA, ptrB);
        return c;
    };
}
沦落红尘 2024-07-16 05:11:27

我想知道你是否不能在这里使用常规的 Delegate.BeginInvoke/Delegate.EndInvoke ? 如果没有,那么等待句柄(例如 ManualResetEvent)可能是一个选项:

using System.Threading;
static class Program {
    static void Main()
    {
        ThreadPool.QueueUserWorkItem(DoWork);

        System.Console.WriteLine("Main: waiting");
        wait.WaitOne();
        System.Console.WriteLine("Main: done");
    }
    static void DoWork(object state)
    {
        System.Console.WriteLine("DoWork: working");
        Thread.Sleep(5000); // simulate work
        System.Console.WriteLine("DoWork: done");
        wait.Set();
    }
    static readonly ManualResetEvent wait = new ManualResetEvent(false);

}

请注意,如果您确实需要,可以仅使用对象来执行此操作:

using System.Threading;
static class Program {
    static void Main()
    {
        object syncObj = new object();
        lock (syncObj)
        {
            ThreadPool.QueueUserWorkItem(DoWork, syncObj);

            System.Console.WriteLine("Main: waiting");
            Monitor.Wait(syncObj);
            System.Console.WriteLine("Main: done");
        }
    }
    static void DoWork(object syncObj)
    {

        System.Console.WriteLine("DoWork: working");
        Thread.Sleep(5000); // simulate work
        System.Console.WriteLine("DoWork: done");
        lock (syncObj)
        {
            Monitor.Pulse(syncObj);
        }
    }

}

I wonder if you couldn't use the regular Delegate.BeginInvoke/Delegate.EndInvoke here? If not, then a wait handle (such as a ManualResetEvent) might be an option:

using System.Threading;
static class Program {
    static void Main()
    {
        ThreadPool.QueueUserWorkItem(DoWork);

        System.Console.WriteLine("Main: waiting");
        wait.WaitOne();
        System.Console.WriteLine("Main: done");
    }
    static void DoWork(object state)
    {
        System.Console.WriteLine("DoWork: working");
        Thread.Sleep(5000); // simulate work
        System.Console.WriteLine("DoWork: done");
        wait.Set();
    }
    static readonly ManualResetEvent wait = new ManualResetEvent(false);

}

Note that you can do this just using object if you really want:

using System.Threading;
static class Program {
    static void Main()
    {
        object syncObj = new object();
        lock (syncObj)
        {
            ThreadPool.QueueUserWorkItem(DoWork, syncObj);

            System.Console.WriteLine("Main: waiting");
            Monitor.Wait(syncObj);
            System.Console.WriteLine("Main: done");
        }
    }
    static void DoWork(object syncObj)
    {

        System.Console.WriteLine("DoWork: working");
        Thread.Sleep(5000); // simulate work
        System.Console.WriteLine("DoWork: done");
        lock (syncObj)
        {
            Monitor.Pulse(syncObj);
        }
    }

}
悲凉≈ 2024-07-16 05:11:27

使用 cudaThreadSyncronize() 或 memcpy() 您可以执行同步操作 - 适合 Invoke()。

CUDA 还允许您使用 callAsync() /sync() 请求异步内存传输 - 适合使用 callAsync() 的 Begin/EndInvoke()。

Using cudaThreadSyncronize() or memcpy() you can preform synchronous operations - suitable for Invoke().



CUDA also lets you request an asynchronic memory transfer using callAsync() / sync() - suitable for Begin/EndInvoke() using callAsync().

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文