非阻塞并发收集?

发布于 2024-09-10 11:00:21 字数 598 浏览 3 评论 0原文

System.Collections.Concurrent 有一些在多线程环境中运行良好的新集合。然而,它们有点有限。它们要么阻塞,直到某个项目可用,要么返回 default(T) (TryXXX 方法)。

我需要一个线程安全的集合,但它不会阻塞调用线程,而是使用回调来通知我至少有一个项目可用。

我当前的解决方案是使用 BlockingCollection,但使用带有委托的 APM 来获取下一个元素。换句话说,我创建了一个从集合中获取的方法的委托,并使用BeginInvoke 执行该委托。

不幸的是,为了实现这一目标,我必须在班级中保留很多状态。更糟糕的是,该类不是线程安全的;它只能由单个线程使用。我正在回避可维护性的边缘,我不想这样做。

我知道有一些库可以使我在这里所做的事情变得非常简单(我相信响应式框架就是其中之一),但我想在不添加框架版本 4 之外的任何引用的情况下实现我的目标。

是否有更好的模式可以使用,不需要外部参考来实现我的目标?


tl;dr:

是否有任何模式满足要求:

“我需要向集合发出信号,表明我已准备好接收下一个元素,并让集合在下一个元素到达时执行回调,而不需要任何线程被阻止。”

System.Collections.Concurrent has some new collections that work very well in multithreaded environments. However, they are a bit limited. Either they block until an item becomes available, or they return default(T) (TryXXX methods).

I'm needing a collection that is thread safe, but instead of blocking the calling thread it uses a callback to inform me that at least one item is available.

My current solution is to use a BlockingCollection, but to use the APM with a delegate to get the next element. In other words, I create a delegate to a method that Takes from the collection, and execute that delegate using BeginInvoke.

Unfortunately, I have to keep a lot of state within my class in order to accomplish this. Worse, the class is not thread safe; it can only be used by a single thread. I'm skirting the edge of maintainability, which I'd prefer not to do.

I know there are some libraries out there that make what I'm doing here pretty simple (I believe the Reactive Framework is one of these), but I'd like to accomplish my goals without adding any references outside of version 4 of the framework.

Are there any better patterns I can use that don't require outside references that accomplish my goal?


tl;dr:

Are there any patterns that satisfy the requirement:

"I need to signal a collection that I am ready for the next element, and have the collection execute a callback when that next element has arrived, without any threads being blocked."

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

如痴如狂 2024-09-17 11:00:21

我想我有两种可能的解决方案。我对这两者都不是特别满意,但它们至少提供了 APM 方法的合理替代方案。

第一个不满足您对无阻塞线程的要求,但我认为它相当优雅,因为您可以注册回调并且它们将以循环方式调用,但您仍然可以调用 TakeTryTake 就像您通常对 BlockingCollection 所做的那样。此代码强制在每次请求项目时注册回调。这就是集合的信号机制。这种方法的好处是对 Take 的调用不会像我的第二个解决方案中那样被饿死。

public class NotifyingBlockingCollection<T> : BlockingCollection<T>
{
    private Thread m_Notifier;
    private BlockingCollection<Action<T>> m_Callbacks = new BlockingCollection<Action<T>>();

    public NotifyingBlockingCollection()
    {
        m_Notifier = new Thread(Notify);
        m_Notifier.IsBackground = true;
        m_Notifier.Start();
    }

    private void Notify()
    {
        while (true)
        {
            Action<T> callback = m_Callbacks.Take();
            T item = Take();
            callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
        }
    }

    public void RegisterForTake(Action<T> callback)
    {
        m_Callbacks.Add(callback);
    }
}

第二个确实满足您无阻塞线程的要求。请注意它如何将回调的调用传输到线程池。我这样做是因为我认为如果它同步执行,那么锁将保持更长时间,从而导致 AddRegisterForTake 出现瓶颈。我仔细查看了它,我认为它不会被实时锁定(项目和回调都可用,但回调永远不会被执行),但您可能需要自己查看以进行验证。这里唯一的问题是对 Take 的调用会被饿死,因为回调总是优先。

public class NotifyingBlockingCollection<T>
{
    private BlockingCollection<T> m_Items = new BlockingCollection<T>();
    private Queue<Action<T>> m_Callbacks = new Queue<Action<T>>();

    public NotifyingBlockingCollection()
    {
    }

    public void Add(T item)
    {
        lock (m_Callbacks)
        {
            if (m_Callbacks.Count > 0)
            {
                Action<T> callback = m_Callbacks.Dequeue();
                callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
            }
            else
            {
                m_Items.Add(item);
            }
        }
    }

    public T Take()
    {
        return m_Items.Take();
    }

    public void RegisterForTake(Action<T> callback)
    {
        lock (m_Callbacks)
        {
            T item;
            if (m_Items.TryTake(out item))
            {
                callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
            }
            else
            {
                m_Callbacks.Enqueue(callback);
            }
        }
    }
}

I think I have two possible solutions. I am not particularly satisfied with either, but they do at least provide a reasonable alternative to the APM approach.

The first does not meet your requirement of no blocking thread, but I think it is rather elegant because you can register callbacks and they will get called in round-robin fashion, but you still have the ability to call Take or TryTake as you normally would for a BlockingCollection. This code forces callbacks to be registered each time an item is requested. That is the signalling mechanism for the collection. The nice thing about this approach is that calls to Take do not get starved as they do in my second solution.

public class NotifyingBlockingCollection<T> : BlockingCollection<T>
{
    private Thread m_Notifier;
    private BlockingCollection<Action<T>> m_Callbacks = new BlockingCollection<Action<T>>();

    public NotifyingBlockingCollection()
    {
        m_Notifier = new Thread(Notify);
        m_Notifier.IsBackground = true;
        m_Notifier.Start();
    }

    private void Notify()
    {
        while (true)
        {
            Action<T> callback = m_Callbacks.Take();
            T item = Take();
            callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
        }
    }

    public void RegisterForTake(Action<T> callback)
    {
        m_Callbacks.Add(callback);
    }
}

The second does meet your requirement of no blocking thread. Notice how it transfers the invocation of the callback to the thread pool. I did this because I am thinking that if it got executed synchronously then the locks would be held longer resulting in the bottlenecking of Add and RegisterForTake. I have looked it over closely and I do not think it can get live locked (both an item and a callback are available, but the callback never gets executed) but you might want to look it over yourself to verify. The only problem here is that a call to Take would get starved as callbacks always take priority.

public class NotifyingBlockingCollection<T>
{
    private BlockingCollection<T> m_Items = new BlockingCollection<T>();
    private Queue<Action<T>> m_Callbacks = new Queue<Action<T>>();

    public NotifyingBlockingCollection()
    {
    }

    public void Add(T item)
    {
        lock (m_Callbacks)
        {
            if (m_Callbacks.Count > 0)
            {
                Action<T> callback = m_Callbacks.Dequeue();
                callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
            }
            else
            {
                m_Items.Add(item);
            }
        }
    }

    public T Take()
    {
        return m_Items.Take();
    }

    public void RegisterForTake(Action<T> callback)
    {
        lock (m_Callbacks)
        {
            T item;
            if (m_Items.TryTake(out item))
            {
                callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
            }
            else
            {
                m_Callbacks.Enqueue(callback);
            }
        }
    }
}
Bonjour°[大白 2024-09-17 11:00:21

像这样的事情怎么样? (命名可能需要一些工作。请注意,这是未经测试的。)

public class CallbackCollection<T>
{
    // Sychronization object to prevent race conditions.
    private object _SyncObject = new object();

    // A queue for callbacks that are waiting for items.
    private ConcurrentQueue<Action<T>> _Callbacks = new ConcurrentQueue<Action<T>>();

    // A queue for items that are waiting for callbacks.
    private ConcurrentQueue<T> _Items = new ConcurrentQueue<T>();

    public void Add(T item)
    {
        Action<T> callback;
        lock (_SyncObject)
        {
            // Try to get a callback. If no callback is available,
            // then enqueue the item to wait for the next callback
            // and return.
            if (!_Callbacks.TryDequeue(out callback))
            {
                _Items.Enqueue(item);
                return;
            }
        }

        ExecuteCallback(callback, item);
    }

    public void TakeAndCallback(Action<T> callback)
    {
        T item;
        lock(_SyncObject)
        {
            // Try to get an item. If no item is available, then
            // enqueue the callback to wait for the next item
            // and return.
            if (!_Items.TryDequeue(out item))
            {
                _Callbacks.Enqueue(callback);
                return;
            }
        }
        ExecuteCallback(callback, item);
    }

    private void ExecuteCallback(Action<T> callback, T item)
    {
        // Use a new Task to execute the callback so that we don't
        // execute it on the current thread.
        Task.Factory.StartNew(() => callback.Invoke(item));
    }
}

How about something like this? (The naming could probably use some work. And note that this is untested.)

public class CallbackCollection<T>
{
    // Sychronization object to prevent race conditions.
    private object _SyncObject = new object();

    // A queue for callbacks that are waiting for items.
    private ConcurrentQueue<Action<T>> _Callbacks = new ConcurrentQueue<Action<T>>();

    // A queue for items that are waiting for callbacks.
    private ConcurrentQueue<T> _Items = new ConcurrentQueue<T>();

    public void Add(T item)
    {
        Action<T> callback;
        lock (_SyncObject)
        {
            // Try to get a callback. If no callback is available,
            // then enqueue the item to wait for the next callback
            // and return.
            if (!_Callbacks.TryDequeue(out callback))
            {
                _Items.Enqueue(item);
                return;
            }
        }

        ExecuteCallback(callback, item);
    }

    public void TakeAndCallback(Action<T> callback)
    {
        T item;
        lock(_SyncObject)
        {
            // Try to get an item. If no item is available, then
            // enqueue the callback to wait for the next item
            // and return.
            if (!_Items.TryDequeue(out item))
            {
                _Callbacks.Enqueue(callback);
                return;
            }
        }
        ExecuteCallback(callback, item);
    }

    private void ExecuteCallback(Action<T> callback, T item)
    {
        // Use a new Task to execute the callback so that we don't
        // execute it on the current thread.
        Task.Factory.StartNew(() => callback.Invoke(item));
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文