使用阻塞收集和任务的经典生产者消费者模式 .net 4 TPL

发布于 2024-11-17 09:01:10 字数 1324 浏览 2 评论 0原文

请参阅下面的伪代码,

//Single or multiple Producers produce using below method
    void Produce(object itemToQueue)
    {
        concurrentQueue.enqueue(itemToQueue);
        consumerSignal.set;
    }

    //somewhere else we have started a consumer like this
    //we have only one consumer
    void StartConsumer()
    {
        while (!concurrentQueue.IsEmpty())
        {
            if (concurrentQueue.TrydeQueue(out item))
            {
                //long running processing of item
            }
        }
        consumerSignal.WaitOne();
    }

如何移植我自古以来就使用的这个模式,以使用 taskfactory 创建的任务和 net 4 的新信号功能。换句话说,如果有人使用 net 4 编写这个模式,它会是什么样子?伪代码没问题。如您所见,我已经在使用 .net 4并发队列。如果可能的话,我如何使用任务并可能使用一些更新的信号机制。感谢

Jon/Dan 解决了我下面的问题。甜的。 没有像过去那样的手动信号或 while(true) 或 while(itemstoProcess) 类型循环

//Single or multiple Producers produce using below method
 void Produce(object itemToQueue)
 {
     blockingCollection.add(item);
 }

 //somewhere else we have started a consumer like this
 //this supports multiple consumers !
 task(StartConsuming()).Start; 

 void StartConsuming()
 {
     foreach (object item in blockingCollection.GetConsumingEnumerable())
     {
                //long running processing of item
     }
 }

cancellations are handled using cancel tokens

Please see below pseudo code

//Single or multiple Producers produce using below method
    void Produce(object itemToQueue)
    {
        concurrentQueue.enqueue(itemToQueue);
        consumerSignal.set;
    }

    //somewhere else we have started a consumer like this
    //we have only one consumer
    void StartConsumer()
    {
        while (!concurrentQueue.IsEmpty())
        {
            if (concurrentQueue.TrydeQueue(out item))
            {
                //long running processing of item
            }
        }
        consumerSignal.WaitOne();
    }

How do I port this pattern I have used since time immemorial to use taskfactory created tasks and the new signalling features of net 4. In other words if someone were to write this pattern using net 4 how would it look like ? Pseudo code is fine. Iam already using .net 4 concurrentQueue as you can see. How do I use a task and possibly use some newer signalling mechanism if possible. thanks

Solution to my problem below thanks to Jon/Dan. Sweet.
No manual signalling or while(true) or while(itemstoProcess) type loops like the old days

//Single or multiple Producers produce using below method
 void Produce(object itemToQueue)
 {
     blockingCollection.add(item);
 }

 //somewhere else we have started a consumer like this
 //this supports multiple consumers !
 task(StartConsuming()).Start; 

 void StartConsuming()
 {
     foreach (object item in blockingCollection.GetConsumingEnumerable())
     {
                //long running processing of item
     }
 }

cancellations are handled using cancel tokens

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

鸢与 2024-11-24 09:01:10

您可以使用 BlockingCollection。文档中有一个示例。

这个类是专门为了让这个变得简单而设计的。

You would use BlockingCollection<T>. There's an example in the documentation.

That class is specifically designed to make this trivial.

忆梦 2024-11-24 09:01:10

你的第二个代码块看起来更好。但是,启动一个任务然后立即等待它是没有意义的。只需调用 Take ,然后处理直接在消费线程上返回的项目即可。这就是生产者-消费者模式的目的。如果您认为工作项的处理足够密集,足以保证更多的消费者,那么无论如何都要启动更多的消费者。 BlockingCollection 对于多个生产者和多个消费者来说是安全的。

public class YourCode
{
  private BlockingCollection<object> queue = new BlockingCollection<object>();

  public YourCode()
  {
    var thread = new Thread(StartConsuming);
    thread.IsBackground = true;
    thread.Start();
  }

  public void Produce(object item)
  {
    queue.Add(item);
  }

  private void StartConsuming()
  {
    while (true)
    {
      object item = queue.Take();
      // Add your code to process the item here.
      // Do not start another task or thread. 
    }
  }
}

Your second block of code looks better. But, starting a Task and then immediately waiting on it is pointless. Just call Take and then process the item that is returned directly on the consuming thread. That is how the producer-consumer pattern is meant to be done. If you think the processing of work items is intensive enough to warrant more consumers then by all means start more consumers. BlockingCollection is safe multiple producers and multiple consumers.

public class YourCode
{
  private BlockingCollection<object> queue = new BlockingCollection<object>();

  public YourCode()
  {
    var thread = new Thread(StartConsuming);
    thread.IsBackground = true;
    thread.Start();
  }

  public void Produce(object item)
  {
    queue.Add(item);
  }

  private void StartConsuming()
  {
    while (true)
    {
      object item = queue.Take();
      // Add your code to process the item here.
      // Do not start another task or thread. 
    }
  }
}
情释 2024-11-24 09:01:10

我之前使用过一种模式,创建了一种“按需”队列消费者(基于从 ConcurrentQueue 消费):

        private void FireAndForget(Action fire)
        {
            _firedEvents.Enqueue(fire);
            lock (_taskLock)
            {
                if (_launcherTask == null)
                {
                    _launcherTask = new Task(LaunchEvents);
                    _launcherTask.ContinueWith(EventsComplete);
                    _launcherTask.Start();
                }
            }
        }

        private void LaunchEvents()
        {
            Action nextEvent;

            while (_firedEvents.TryDequeue(out nextEvent))
            {
                if (_synchronized)
                {
                    var syncEvent = nextEvent;
                    _mediator._syncContext.Send(state => syncEvent(), null);
                }
                else
                {
                    nextEvent();                        
                }

                lock (_taskLock)
                {
                    if (_firedEvents.Count == 0)
                    {
                        _launcherTask = null;
                        break;
                    }
                }
            }
        }

        private void EventsComplete(Task task)
        {
            if (task.IsFaulted && task.Exception != null)
            {
                 // Do something with task Exception here
            }
        }

I've used a pattern before that creates a sort of 'on-demand' queue consumer (based on consuming from a ConcurrentQueue):

        private void FireAndForget(Action fire)
        {
            _firedEvents.Enqueue(fire);
            lock (_taskLock)
            {
                if (_launcherTask == null)
                {
                    _launcherTask = new Task(LaunchEvents);
                    _launcherTask.ContinueWith(EventsComplete);
                    _launcherTask.Start();
                }
            }
        }

        private void LaunchEvents()
        {
            Action nextEvent;

            while (_firedEvents.TryDequeue(out nextEvent))
            {
                if (_synchronized)
                {
                    var syncEvent = nextEvent;
                    _mediator._syncContext.Send(state => syncEvent(), null);
                }
                else
                {
                    nextEvent();                        
                }

                lock (_taskLock)
                {
                    if (_firedEvents.Count == 0)
                    {
                        _launcherTask = null;
                        break;
                    }
                }
            }
        }

        private void EventsComplete(Task task)
        {
            if (task.IsFaulted && task.Exception != null)
            {
                 // Do something with task Exception here
            }
        }
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文