具有变体的生产者-消费者 - 如何与线程信号/等待同步?

发布于 2024-10-04 01:34:10 字数 1379 浏览 3 评论 0原文

在从事一个大型项目时,我意识到我要打很多电话来安排未来的工作。由于它们相当轻量,我认为使用单独的调度程序可能会更好。

ThreadPool.QueueUserWorkItem (() => 
{
    Thread.Sleep (5000);
    Foo (); // Call is to be executed after sometime
});

因此,我创建了一个单独的调度程序类,它在自己的线程上运行并执行这些事件。我有两个函数可以从单独的线程访问共享队列。我会使用锁,但由于其中一个线程需要睡眠等待,我不确定如何释放锁。

class Scheduler
{
    SortedDictionary <DateTime, Action> _queue;
    EventWaitHandle _sync;

    // Runs on its own thread
    void Run ()
    {
        while (true)
        {
            // Calculate time till first event
            // If queue empty, use pre-defined value
            TimeSpan timeDiff = _queue.First().Key - DateTime.Now;

            // Execute action if in the next 100ms
            if (timeDiff < 100ms)
                ...
            // Wait on event handle for time
            else
                _sync.WaitOne (timeDiff);
        }
    }

    // Can be called by any thread
    void ScheduleEvent (Action action, DataTime time)
    {
        _queue.Add (time, action);
        // Signal thread to wake up and check again
        _sync.Set ();
    }
}

  • 问题是,我不确定如何在两个函数之间同步对队列的访问。我无法使用监视器或互斥锁,因为 Run() 会睡眠等待,从而导致死锁。这里使用什么是正确的同步机制? (如果有一种机制可以自动启动睡眠等待进程并立即释放锁,那可能会解决我的问题)
  • 如何验证不存在竞争条件?
  • 这是生产者消费者问题的变体,还是有更相关的同步问题描述?

    虽然这在某种程度上面向 C#,但我很高兴听到对此的通用解决方案。谢谢!

  • While working on a large project I realized I was making a lot of calls to be scheduled in the future. Since these were fairly light-weight, I thought it might be better to use a separate scheduler.

    ThreadPool.QueueUserWorkItem (() => 
    {
        Thread.Sleep (5000);
        Foo (); // Call is to be executed after sometime
    });
    

    So I created a separate scheduler class that runs on its own thread and executes these events. I have 2 functions that access a shared queue from separate threads. I'd use a lock, but since one of the threads needs to sleep-wait, I wasn't sure how to release the lock.

    class Scheduler
    {
        SortedDictionary <DateTime, Action> _queue;
        EventWaitHandle _sync;
    
        // Runs on its own thread
        void Run ()
        {
            while (true)
            {
                // Calculate time till first event
                // If queue empty, use pre-defined value
                TimeSpan timeDiff = _queue.First().Key - DateTime.Now;
    
                // Execute action if in the next 100ms
                if (timeDiff < 100ms)
                    ...
                // Wait on event handle for time
                else
                    _sync.WaitOne (timeDiff);
            }
        }
    
        // Can be called by any thread
        void ScheduleEvent (Action action, DataTime time)
        {
            _queue.Add (time, action);
            // Signal thread to wake up and check again
            _sync.Set ();
        }
    }
    
  • The trouble is, I'm not sure how to synchronize access to the queue between the 2 functions. I can't use a monitor or mutex, because Run() will sleep-wait, thus causing a deadlock. What is the right synchronization mechanism to use here? (If there a mechanism to atomically start the sleep-wait process and immediately release the lock, that might solve my problem)
  • How can I verify there is no race-condition?
  • Is this a variation of the producer consumer problem, or is there a more relevant synchronization problem-description?

    While this is somewhat geared towards C#, I'd be happy to hear a general solution to this. Thanks!

  • 如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

    扫码二维码加入Web技术交流群

    发布评论

    需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

    评论(4

    沧笙踏歌 2024-10-11 01:34:10

    好的,用监视器/脉冲取 2。

        void Run ()    
        {
            while (true)
            {
                Action doit = null;
    
                lock(_queueLock)
                {
                    while (_queue.IsEmpty())
                        Monitor.Wait(_queueLock);
    
                    TimeSpan timeDiff = _queue.First().Key - DateTime.Now;
                    if (timeDiff < 100ms)
                        doit = _queue.Dequeue();
                }
    
                if (doit != null)
                    ; //execute doit
                else
                 _sync.WaitOne (timeDiff);  
            }
        }
    
    
    void ScheduleEvent (Action action, DataTime time)
    {
        lock (_queueLock)
        {
            _queue.Add(time, action);
            // Signal thread to wake up and check again
            _sync.Set ();
            if (_queue.Count == 1)
                 Monitor.Pulse(_queuLock);
        }
    }
    

    OK, take 2 with Monitor/Pulse.

        void Run ()    
        {
            while (true)
            {
                Action doit = null;
    
                lock(_queueLock)
                {
                    while (_queue.IsEmpty())
                        Monitor.Wait(_queueLock);
    
                    TimeSpan timeDiff = _queue.First().Key - DateTime.Now;
                    if (timeDiff < 100ms)
                        doit = _queue.Dequeue();
                }
    
                if (doit != null)
                    ; //execute doit
                else
                 _sync.WaitOne (timeDiff);  
            }
        }
    
    
    void ScheduleEvent (Action action, DataTime time)
    {
        lock (_queueLock)
        {
            _queue.Add(time, action);
            // Signal thread to wake up and check again
            _sync.Set ();
            if (_queue.Count == 1)
                 Monitor.Pulse(_queuLock);
        }
    }
    
    汹涌人海 2024-10-11 01:34:10

    问题很容易解决,确保WaitOne在锁外。

      //untested
      while (true)
      {
          Action doit = null;
    
          // Calculate time till first event
          // If queue empty, use pre-defined value
          lock(_queueLock)
          {
             TimeSpan timeDiff = _queue.First().Key - DateTime.Now;
             if (timeDiff < 100ms)
                doit = _queue.Dequeue();
          }
          if (doit != null)
            // execute it
          else
             _sync.WaitOne (timeDiff);
      }
    

    _queueLock 是一个私有辅助对象。

    The problem is easily solved, make sure the WaitOne is outside the lock.

      //untested
      while (true)
      {
          Action doit = null;
    
          // Calculate time till first event
          // If queue empty, use pre-defined value
          lock(_queueLock)
          {
             TimeSpan timeDiff = _queue.First().Key - DateTime.Now;
             if (timeDiff < 100ms)
                doit = _queue.Dequeue();
          }
          if (doit != null)
            // execute it
          else
             _sync.WaitOne (timeDiff);
      }
    

    _queueLock is a private helper object.

    禾厶谷欠 2024-10-11 01:34:10

    既然您的目标是在特定时间段后安排任务,为什么不直接使用 System.Threading.Timer 呢?它不需要专用线程进行调度,并利用操作系统来唤醒工作线程。我已经使用过这个(删除了一些注释和其他计时器服务功能):

    public sealed class TimerService : ITimerService
    {
        public void WhenElapsed(TimeSpan duration, Action callback)
        {
            if (callback == null) throw new ArgumentNullException("callback");
    
            //Set up state to allow cleanup after timer completes
            var timerState = new TimerState(callback);
            var timer = new Timer(OnTimerElapsed, timerState, Timeout.Infinite, Timeout.Infinite);
            timerState.Timer = timer;
    
            //Start the timer
            timer.Change((int) duration.TotalMilliseconds, Timeout.Infinite);
        }
    
        private void OnTimerElapsed(Object state)
        {
            var timerState = (TimerState)state;
            timerState.Timer.Dispose();
            timerState.Callback();
        }
    
        private class TimerState
        {
            public Timer Timer { get; set; }
    
            public Action Callback { get; private set; }
    
            public TimerState(Action callback)
            {
                Callback = callback;
            }
        }
    }
    

    Since your goal is to schedule a task after a particular period of time, why not just use the System.Threading.Timer? It doesn't require dedicating a thread for the scheduling and takes advantage of the OS to wake up a worker thread. I've used this (removed some comments and other timer service functionality):

    public sealed class TimerService : ITimerService
    {
        public void WhenElapsed(TimeSpan duration, Action callback)
        {
            if (callback == null) throw new ArgumentNullException("callback");
    
            //Set up state to allow cleanup after timer completes
            var timerState = new TimerState(callback);
            var timer = new Timer(OnTimerElapsed, timerState, Timeout.Infinite, Timeout.Infinite);
            timerState.Timer = timer;
    
            //Start the timer
            timer.Change((int) duration.TotalMilliseconds, Timeout.Infinite);
        }
    
        private void OnTimerElapsed(Object state)
        {
            var timerState = (TimerState)state;
            timerState.Timer.Dispose();
            timerState.Callback();
        }
    
        private class TimerState
        {
            public Timer Timer { get; set; }
    
            public Action Callback { get; private set; }
    
            public TimerState(Action callback)
            {
                Callback = callback;
            }
        }
    }
    
    转角预定愛 2024-10-11 01:34:10

    监视器是为这种情况创建的,简单的问题可能会给应用程序带来很大的成本,我对此的解决方案非常简单,如果您想让关闭变得容易实现:

        void Run()
        {
          while(true)
             lock(this)
             {
                int timeToSleep = getTimeToSleep() //check your list and return a value
                if(timeToSleep <= 100) 
                    action...
                else
                {
    
                   int currTime = Datetime.Now;
                   int currCount = yourList.Count;
                   try{
                   do{
                     Monitor.Wait(this,timeToSleep);
    
                     if(Datetime.now >= (tomeToSleep + currtime))
                          break; //time passed
    
                     else if(yourList.Count != currCount)
                        break; //new element added go check it
                     currTime = Datetime.Now;
                   }while(true);
                }
                }catch(ThreadInterruptedException e)
                {
                    //do cleanup code or check for shutdown notification
                }
             }
          }
        }
    
    void ScheduleEvent (Action action, DataTime time)
    {
        lock(this)
        {
           yourlist.add ...
           Monitor.Pulse(this);
    

    }
    }

    The monitores were created for this kind of situation, simple problems that can cost mutch for the application, i present my solution to this very simple and if u want to make a shutdown easy to implement:

        void Run()
        {
          while(true)
             lock(this)
             {
                int timeToSleep = getTimeToSleep() //check your list and return a value
                if(timeToSleep <= 100) 
                    action...
                else
                {
    
                   int currTime = Datetime.Now;
                   int currCount = yourList.Count;
                   try{
                   do{
                     Monitor.Wait(this,timeToSleep);
    
                     if(Datetime.now >= (tomeToSleep + currtime))
                          break; //time passed
    
                     else if(yourList.Count != currCount)
                        break; //new element added go check it
                     currTime = Datetime.Now;
                   }while(true);
                }
                }catch(ThreadInterruptedException e)
                {
                    //do cleanup code or check for shutdown notification
                }
             }
          }
        }
    
    void ScheduleEvent (Action action, DataTime time)
    {
        lock(this)
        {
           yourlist.add ...
           Monitor.Pulse(this);
    

    }
    }

    ~没有更多了~
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文