.net 4.0 任务:在一个或多个对象上同步

发布于 2024-10-11 13:54:52 字数 4196 浏览 1 评论 0原文

我已经阅读了很多有关 .net 4.0 中新任务功能的内容,但我还没有找到以下问题的解决方案:

我正在编写一个处理来自许多用户的请求的服务器应用程序,并且我想使用任务来分发这些请求在多个核心上。然而,这些任务应该在对象上同步(对于一开始的用户来说),以便一次只为每个对象处理一个任务。使用 Task.ContinueWith() 可以很容易地实现这一点,但也应该可以在多个对象上同步任务(例如,当用户向另一个用户转账时,变量应该在用户 A 处递减,在用户 B 处递增)没有其他任务干扰)。

因此,我的第一次尝试是一个接收委托、创建任务并将其存储在字典中的类,其中要同步的对象作为键。如果计划了一个新任务,可以使用 Task.ContinueWith() 将其附加到给定对象的最后一个任务。如果需要在多个对象上同步,则使用 TaskFactory.ContinueWhenAll() 创建新任务。创建的任务存储在与其同步的每个对象的字典中。 这是我的初稿:

 public class ActionScheduler:IActionScheduler
{
    private readonly IDictionary<object, Task> mSchedulingDictionary = new Dictionary<object, Task>();
    private readonly TaskFactory mTaskFactory = new TaskFactory();

    /// <summary>
    /// Schedules actions synchonized on one or more objects. Only one action will be processed for each object at any time.
    /// </summary>
    /// <param name="synchronisationObjects">Array of objects the current action is synchronized on</param>
    /// <param name="action">The action that will be scheduled and processed</param>
    public void ScheduleTask(object[] synchronisationObjects, Action action)
    {            
        // lock the dictionary in case two actions are scheduled on the same object at the same time
        // this is necessary since reading and writing to a dictionary can not be done in an atomic manner
        lock(mSchedulingDictionary)
        {
            // get all current tasks for the given synchronisation objects
            var oldTaskList = new List<Task>();
            foreach (var syncObject in synchronisationObjects)
            {
                Task task;
                mSchedulingDictionary.TryGetValue(syncObject, out task);
                if (task != null)
                    oldTaskList.Add(task);
            }

            // create a new task for the given action
            Task newTask;
            if (oldTaskList.Count > 1)
            {
                // task depends on multiple previous tasks
                newTask = mTaskFactory.ContinueWhenAll(oldTaskList.ToArray(), t => action());
            }
            else
            {
                if (oldTaskList.Count == 1)
                {
                    // task depends on exactly one previous task
                    newTask = oldTaskList[0].ContinueWith(t => action());
                }
                else
                {
                    // task does not depend on any previous task and can be started immediately
                    newTask = new Task(action);
                    newTask.Start();
                }
            }

            // store the task in the dictionary
            foreach (var syncObject in synchronisationObjects)
            {
                mSchedulingDictionary[syncObject] = newTask;
            }
        }
    }
}

如果为多个对象创建任务“multiSyncTask”,然后为每个对象安排任务,这甚至可以工作。由于它们都是使用 multiSyncTask.ContinueWith() 创建的,因此它们同步启动:

static void Main()
    {
        IActionScheduler actionScheduler = new ActionScheduler();

        var syncObj1 = new object();
        var syncObj2 = new object();

        // these two start and complete simultaneously:
        actionScheduler.ScheduleTask(new[] { syncObj1 }, () => PrintTextAfterWait("1"));
        actionScheduler.ScheduleTask(new[] { syncObj2 }, () => PrintTextAfterWait("2"));
        // this task starts after the first two and "locks" both objects:
        actionScheduler.ScheduleTask(new[] { syncObj1, syncObj2 }, () => PrintTextAfterWait("1 and 2"));
        // these two - again - start and complete simultaneously after the task above:
        actionScheduler.ScheduleTask(new[] { syncObj1 }, () => PrintTextAfterWait("1"));
        actionScheduler.ScheduleTask(new[] { syncObj2 }, () => PrintTextAfterWait("2"));
    }

    static void PrintTextAfterWait(string text)
    {
        Thread.Sleep(3000);
        Console.WriteLine(text);
    }

您认为这对我的问题来说是一个好的解决方案吗?我对字典上的大锁有点怀疑,但如果同时在一个对象上安排两个任务以防止竞争条件,这是必要的。当然,字典只是在创建任务时被锁定,而不是在处理任务时被锁定。

另外,我很想知道是否有任何现有的解决方案或编码范例可以使用我未能找到的 .net 4.0 任务更好地解决我的问题。

谢谢您并致以最诚挚的问候, 约翰内斯

I have read a lot about the new Task functionality in .net 4.0, but I haven't found a solution for the following problem:

I am writing a server application that processes requests from many users and I want to use Tasks to distribute these request on multiple cores. However, these Tasks should be synchronized on objects - for the beginning, users -, so that just one task is processed for each object at a time. This would be simple to achieve with Task.ContinueWith(), but it should also be possible to synchonize a task on multiple objects (e.g. when a user transfers money to another user, a variable should be decremented at user A and incremented at user B without other tasks interfering).

So, my first attempt is a class that receives delegates, creates tasks and stores them in a dictionary with the objects to sync on as keys. If a new task is scheduled, it can be appended to the last task of the given object with Task.ContinueWith(). If it should be synchronized on multiple objects, the new Task is created using TaskFactory.ContinueWhenAll(). The created task is stored in the dictionary for every object it is synchronized on.
Here is my first draft:

 public class ActionScheduler:IActionScheduler
{
    private readonly IDictionary<object, Task> mSchedulingDictionary = new Dictionary<object, Task>();
    private readonly TaskFactory mTaskFactory = new TaskFactory();

    /// <summary>
    /// Schedules actions synchonized on one or more objects. Only one action will be processed for each object at any time.
    /// </summary>
    /// <param name="synchronisationObjects">Array of objects the current action is synchronized on</param>
    /// <param name="action">The action that will be scheduled and processed</param>
    public void ScheduleTask(object[] synchronisationObjects, Action action)
    {            
        // lock the dictionary in case two actions are scheduled on the same object at the same time
        // this is necessary since reading and writing to a dictionary can not be done in an atomic manner
        lock(mSchedulingDictionary)
        {
            // get all current tasks for the given synchronisation objects
            var oldTaskList = new List<Task>();
            foreach (var syncObject in synchronisationObjects)
            {
                Task task;
                mSchedulingDictionary.TryGetValue(syncObject, out task);
                if (task != null)
                    oldTaskList.Add(task);
            }

            // create a new task for the given action
            Task newTask;
            if (oldTaskList.Count > 1)
            {
                // task depends on multiple previous tasks
                newTask = mTaskFactory.ContinueWhenAll(oldTaskList.ToArray(), t => action());
            }
            else
            {
                if (oldTaskList.Count == 1)
                {
                    // task depends on exactly one previous task
                    newTask = oldTaskList[0].ContinueWith(t => action());
                }
                else
                {
                    // task does not depend on any previous task and can be started immediately
                    newTask = new Task(action);
                    newTask.Start();
                }
            }

            // store the task in the dictionary
            foreach (var syncObject in synchronisationObjects)
            {
                mSchedulingDictionary[syncObject] = newTask;
            }
        }
    }
}

This even works if a task "multiSyncTask" was created for multiple objects, and afterwards tasks for each of the objects are scheduled. Since they are all created with multiSyncTask.ContinueWith(), they start synchronously:

static void Main()
    {
        IActionScheduler actionScheduler = new ActionScheduler();

        var syncObj1 = new object();
        var syncObj2 = new object();

        // these two start and complete simultaneously:
        actionScheduler.ScheduleTask(new[] { syncObj1 }, () => PrintTextAfterWait("1"));
        actionScheduler.ScheduleTask(new[] { syncObj2 }, () => PrintTextAfterWait("2"));
        // this task starts after the first two and "locks" both objects:
        actionScheduler.ScheduleTask(new[] { syncObj1, syncObj2 }, () => PrintTextAfterWait("1 and 2"));
        // these two - again - start and complete simultaneously after the task above:
        actionScheduler.ScheduleTask(new[] { syncObj1 }, () => PrintTextAfterWait("1"));
        actionScheduler.ScheduleTask(new[] { syncObj2 }, () => PrintTextAfterWait("2"));
    }

    static void PrintTextAfterWait(string text)
    {
        Thread.Sleep(3000);
        Console.WriteLine(text);
    }

What do you think - is this a good solution for my problem? I am a bit sceptic about the big lock on the dictionary, but it is necessary in case two tasks are scheduled on one object at once to prevent race conditions. Of course, the dictionary is just locked for the time it takes to create a task, not when it is processed.

Also, I would love to know if there are any already existing solutions or coding paradigms out there that solve my problem better using .net 4.0 Tasks that I have failed to track down.

Thank you and with best regards,
Johannes

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

深海里的那抹蓝 2024-10-18 13:54:52

如果我没猜错的话..你想要一个 Task.ContinueWith(task1, task2, lambda) 吗?
类似于 CCR 中的 Join 仲裁器?
http://msdn.microsoft.com/en-us/library/bb648749.aspx
如果是这样,最优雅的选择可能是在 TPL 数据流中使用 JoinBlock (http://www.microsoft.com/download/en/confirmation.aspx?id=14782)。
或者,也许您是否尝试过使用 Task.WaitAll() 作为依赖任务的第一条指令?

If I got you right.. you would like to have a Task.ContinueWith(task1, task2, lambda)?
Something like the Join arbiter in CCR?
http://msdn.microsoft.com/en-us/library/bb648749.aspx
If so, probably the most elegant option is to use the JoinBlock in TPL dataflow (http://www.microsoft.com/download/en/confirmation.aspx?id=14782).
Or, maybe, have you tried to do use Task.WaitAll() as the first instruction of your dependent task?

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文