在队列上存在多个线程操作的情况下访问 ConcurrentQueue 的最新元素

发布于 2024-11-07 04:31:06 字数 1076 浏览 6 评论 0原文

我们有一个 ConcurrentQueue,用于在 3 个线程之间共享数据。线程 A 不断向队列填充数据。线程 B 旨在将此数据记录到文件中。线程 C 应该检索队列中最新的条目(或尽可能接近最新的条目),对其执行一些操作并将结果显示在屏幕上。

线程B为了及时对文件写操作进行集群,会做这样的事情:

if (cq.Count > 100)
{
    while (cq.Count > 1)
    {
        qElement = PopFromCq(cq); // PopFromCq uses cq.TryDequeue()
        bw.Write(qElement.data); // bw is a binary writer
    }
}
else
{
    System.Threading.Thread.Sleep(10);
}

即等待至少100个元素排队,然后将它们写入磁盘。但它始终在队列中维护至少一项,原因是我们希望线程 C 始终能够访问至少一项。

线程 C 中的循环如下所示:

while (threadsRunning) 
{
    System.Threading.Thread.Sleep(500); // Update twice per second
    ProcessDataAndUpdateScreen(cq.ElementAt(cq.Count - 1)); // our terrible attempt at looking at the latest (or close to latest) entry in the queue
}

在此循环中,由于将数据写入磁盘的线程与 cq.ElementAt(cq.Count-1) 调用之间的竞争,我们有时会遇到异常。我相信发生的事情如下:

  1. cq.Count 计算为 90。
  2. 那时,线程 B 已经开始循环,并且正在将数据从队列中取出以写入磁盘
  3. 。 ) 被调用时,线程 B 消耗了许多项目,使得 (cq.Count - 1) 不再指向队列中的有效条目。

关于在队列上运行多个线程的情况下访问队列中最新条目的好方法是什么?

问候,

We have a ConcurrentQueue which is used to share data among 3 threads. Thread A continuously fills the queue with data. Thread B is designed to record this data to a file. Thread C is supposed to retrieve the youngest entry in the queue (or as close to youngest as possible), perform some operations on it and display results on the screen.

Thread B, in order to cluster the file write operations in time, does something like this:

if (cq.Count > 100)
{
    while (cq.Count > 1)
    {
        qElement = PopFromCq(cq); // PopFromCq uses cq.TryDequeue()
        bw.Write(qElement.data); // bw is a binary writer
    }
}
else
{
    System.Threading.Thread.Sleep(10);
}

ie, it waits for at least 100 elements to queue up, then writes them to the disk. It always maintains at least one item in the queue though and the reason is because we want Thread C to always have access to at least one item.

The loop in thread C looks like:

while (threadsRunning) 
{
    System.Threading.Thread.Sleep(500); // Update twice per second
    ProcessDataAndUpdateScreen(cq.ElementAt(cq.Count - 1)); // our terrible attempt at looking at the latest (or close to latest) entry in the queue
}

In this loop, we sometimes get an exception due to the race between the thread that writes the data to disk, and the cq.ElementAt(cq.Count-1) call. I believe what is happening is as follows:

  1. cq.Count is calculated to be, say 90.
  2. By that time, Thread B already started its loop and it is dequeueing data from the queue to write to the disk
  3. By the time cq.ElementAt() is called, Thread B consumed a number of items such that (cq.Count - 1) no longer points to a valid entry in the queue.

Any ideas on what would be a nice way of accessing the youngest entry in the queue in presence of multiple threads operating on the queue?

Regards,

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

甜点 2024-11-14 04:31:06

AB通信和AC通信是否都需要经过队列?如果您让线程 A 将每个条目写入队列(供 B 读取和记录),并保存该条目,它只是在某处的易失性属性中排队。每次C想要获取最年轻的元素时,它都可以直接从该属性中读取。

编辑:您实际上应该使用 Interlocked.CompareExchange(T, T) 来设置和读取“最新条目”属性,而不是仅仅依赖于易失性属性。

Is it necessary for the A-B communication and A-C communication to both go through the queue? What if you have thread A write each entry to the queue (for B to read and log) and also save the entry it's just queued in a volatile property somewhere. Every time C wants to get the youngest element, it can just read directly from that property.

EDIT: Instead of just relying on a volatile property, you should actually use Interlocked.CompareExchange<T>(T, T) to set and read the "youngest entry" property.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文