ConcurrentDictionary 中的 ConcurrentQueue 重复错误

发布于 2025-01-13 01:13:56 字数 4735 浏览 0 评论 0 原文

我有一个线程处理每 10 秒接收的消息,并让另一个线程每分钟将这些消息写入数据库。 每条消息都有一个不同的发件人,在我的例子中名为 serialNumber

因此,我创建了一个如下所示的 ConcurrentDictionary。

public ConcurrentDictionary<string, ConcurrentQueue<PacketModel>> _dicAllPackets;

字典的键是serialNumber,值为1分钟消息的集合。 我想要收集一分钟数据的原因是每分钟一次而不是每 10 秒一次去数据库,这样我可以将过程减少 1/6 倍。

public class ShotManager
    {
        private const int SLEEP_THREAD_FOR_FILE_LIST_DB_SHOOTER = 25000;
        private bool ACTIVE_FILE_DB_SHOOT_THREAD = false;
        private List<Devices> _devices = new List<Devices>();
        public ConcurrentDictionary<string, ConcurrentQueue<PacketModel>> _dicAllPackets;

        public ShotManager()
        {
            ACTIVE_FILE_DB_SHOOT_THREAD = Utility.GetAppSettings("AppConfig", "0", "ACTIVE_LIST_DB_SHOOT") == "1";
            init();
        }
        private void init()
        {
            using (iotemplaridbContext dbContext = new iotemplaridbContext())
                _devices = (from d in dbContext.Devices select d).ToList();

            if (_dicAllPackets is null)
                _dicAllPackets = new ConcurrentDictionary<string, ConcurrentQueue<PacketModel>>();

            foreach (var device in _devices)
            {
                if(!_dicAllPackets.ContainsKey(device.SerialNumber))
                    _dicAllPackets.TryAdd(device.SerialNumber, new ConcurrentQueue<PacketModel> { });
            }
        }
        public void Spinner()
        {
            while (ACTIVE_FILE_DB_SHOOT_THREAD)
            {
                try
                {
                    Parallel.ForEach(_dicAllPackets, devicePacket =>
                    {
                        Thread.Sleep(100);
                        readAndShot(devicePacket);
                    });
                    Thread.Sleep(SLEEP_THREAD_FOR_FILE_LIST_DB_SHOOTER);
                    //init();
                }
                catch (Exception ex)
                {
                    //init();
                    tLogger.EXC("Spinner exception for write...", ex);
                }
            }
        }
        public void EnqueueObjectToQueue(string serialNumber, PacketModel model)
        {
            if (_dicAllPackets != null)
            {
                if (!_dicAllPackets.ContainsKey(serialNumber))
                    _dicAllPackets.TryAdd(serialNumber, new ConcurrentQueue<PacketModel> { });
                else
                    _dicAllPackets[serialNumber].Enqueue(model);
            }
        }
        private void readAndShot(KeyValuePair<string, ConcurrentQueue<PacketModel>> keyValuePair)
        {
            StringBuilder sb = new StringBuilder();
            if (keyValuePair.Value.Count() <= 0)
            {
                return;
            }
            sb.AppendLine($"INSERT INTO ......) VALUES(");
//the reason why I don't use while(TryDequeue(out ..)){..} is there's constantly enqueue to this dictionary, so the thread will be occupied with a single device for so long
            for (int i = 0; i < 10; i++)
            {
                keyValuePair.Value.TryDequeue(out PacketModel packet);
                if (packet != null)
                {
                    /*
                     *** do something and fill the sb...
                     */
                }
                else
                {
                    Console.WriteLine("No packet found! For Device: " + keyValuePair.Key);
                    break;
                }
            }
            insertIntoDB(sb.ToString()[..(sb.Length - 5)] + ";");
        }
    }

EnqueueObjectToQueue 调用者来自不同的类,如下所示。

private void packetToDictionary(string serialNumber, string jsonPacket, string messageTimeStamp)
{
            PacketModel model = new PacketModel {
                MachineData = jsonPacket, 
                DataInsertedAt = messageTimeStamp
            };
            _shotManager.EnqueueObjectToQueue(serialNumber, model);
}

我如何调用上述函数是来自处理函数本身。

private void messageReceiveHandler(object sender, MessageReceviedEventArgs e){
   //do something...parse from e and call the func
   string jsonPacket = ""; //something parsed from e 
   string serialNumber = ""; //something parsed from e
   string message_timestamp = DateTime.Now().ToString("yyyy-MM-dd HH:mm:ss");
   ThreadPool.QueueUserWorkItem(state => packetToDictionary(serialNumber, str, message_timestamp));
}

问题是有时某些数据包会在错误的 serialNumber 下排队或重复自身(重复条目)。

像这样在 ConcurrentDictionary 中使用 ConcurrentQueue 是否聪明?

I have a thread that handles the message receiving every 10 seconds and have another one write these messages to the database every minute.
Each message has a different sender which is named serialNumber in my case.

Therefore, I created a ConcurrentDictionary like below.

public ConcurrentDictionary<string, ConcurrentQueue<PacketModel>> _dicAllPackets;

The key of the dictionary is serialNumber and the value is the collection of 1-minute messages. The reason I want to collect a minute of data is instead of going database every 10 seconds is go once in every minute so I can reduce the process by 1/6 times.

public class ShotManager
    {
        private const int SLEEP_THREAD_FOR_FILE_LIST_DB_SHOOTER = 25000;
        private bool ACTIVE_FILE_DB_SHOOT_THREAD = false;
        private List<Devices> _devices = new List<Devices>();
        public ConcurrentDictionary<string, ConcurrentQueue<PacketModel>> _dicAllPackets;

        public ShotManager()
        {
            ACTIVE_FILE_DB_SHOOT_THREAD = Utility.GetAppSettings("AppConfig", "0", "ACTIVE_LIST_DB_SHOOT") == "1";
            init();
        }
        private void init()
        {
            using (iotemplaridbContext dbContext = new iotemplaridbContext())
                _devices = (from d in dbContext.Devices select d).ToList();

            if (_dicAllPackets is null)
                _dicAllPackets = new ConcurrentDictionary<string, ConcurrentQueue<PacketModel>>();

            foreach (var device in _devices)
            {
                if(!_dicAllPackets.ContainsKey(device.SerialNumber))
                    _dicAllPackets.TryAdd(device.SerialNumber, new ConcurrentQueue<PacketModel> { });
            }
        }
        public void Spinner()
        {
            while (ACTIVE_FILE_DB_SHOOT_THREAD)
            {
                try
                {
                    Parallel.ForEach(_dicAllPackets, devicePacket =>
                    {
                        Thread.Sleep(100);
                        readAndShot(devicePacket);
                    });
                    Thread.Sleep(SLEEP_THREAD_FOR_FILE_LIST_DB_SHOOTER);
                    //init();
                }
                catch (Exception ex)
                {
                    //init();
                    tLogger.EXC("Spinner exception for write...", ex);
                }
            }
        }
        public void EnqueueObjectToQueue(string serialNumber, PacketModel model)
        {
            if (_dicAllPackets != null)
            {
                if (!_dicAllPackets.ContainsKey(serialNumber))
                    _dicAllPackets.TryAdd(serialNumber, new ConcurrentQueue<PacketModel> { });
                else
                    _dicAllPackets[serialNumber].Enqueue(model);
            }
        }
        private void readAndShot(KeyValuePair<string, ConcurrentQueue<PacketModel>> keyValuePair)
        {
            StringBuilder sb = new StringBuilder();
            if (keyValuePair.Value.Count() <= 0)
            {
                return;
            }
            sb.AppendLine(
quot;INSERT INTO ......) VALUES(");
//the reason why I don't use while(TryDequeue(out ..)){..} is there's constantly enqueue to this dictionary, so the thread will be occupied with a single device for so long
            for (int i = 0; i < 10; i++)
            {
                keyValuePair.Value.TryDequeue(out PacketModel packet);
                if (packet != null)
                {
                    /*
                     *** do something and fill the sb...
                     */
                }
                else
                {
                    Console.WriteLine("No packet found! For Device: " + keyValuePair.Key);
                    break;
                }
            }
            insertIntoDB(sb.ToString()[..(sb.Length - 5)] + ";");
        }
    }

EnqueueObjectToQueue caller is from a different class like below.

private void packetToDictionary(string serialNumber, string jsonPacket, string messageTimeStamp)
{
            PacketModel model = new PacketModel {
                MachineData = jsonPacket, 
                DataInsertedAt = messageTimeStamp
            };
            _shotManager.EnqueueObjectToQueue(serialNumber, model);
}

How I call the above function is from the handler function itself.

private void messageReceiveHandler(object sender, MessageReceviedEventArgs e){
   //do something...parse from e and call the func
   string jsonPacket = ""; //something parsed from e 
   string serialNumber = ""; //something parsed from e
   string message_timestamp = DateTime.Now().ToString("yyyy-MM-dd HH:mm:ss");
   ThreadPool.QueueUserWorkItem(state => packetToDictionary(serialNumber, str, message_timestamp));
}

The problem is sometimes some packets are enqueued under the wrong serialNumber or repeat itself(duplicate entry).

Is it clever to use ConcurrentQueue in a ConcurrentDictionary like this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

绝影如岚 2025-01-20 01:13:57

不,使用带有嵌套 ConcurrentQueue 作为值的 ConcurrentDictionary 并不是一个好主意。不可能以原子方式更新此结构。以此为例:

if (!_dicAllPackets.ContainsKey(serialNumber))
    _dicAllPackets.TryAdd(serialNumber, new ConcurrentQueue<PacketModel> { });
else
    _dicAllPackets[serialNumber].Enqueue(model);

这段小代码充满了竞争条件。运行此代码的线程可以在 ContainsKeyTryAdd[] 索引器和 ContainsKeyTryAdd[] 索引器和 < code>Enqueue 调用,更改结构的状态,并使当前线程工作正确性所依据的条件无效。

当您有一个包含不可变值的简单字典,并且想要并发使用它并使用时,ConcurrentDictionary 是个好主意每次访问都可能会产生严重的争用。您可以在此处阅读有关此内容的更多信息:我应该何时使用 ConcurrentDictionary 和 Dictionary?

我的建议是切换到简单的 Dictionary>,并将其与锁定。如果您小心并避免在持有锁时做任何不相关的事情,那么锁将很快被释放,以至于其他线程很少会被它阻塞。使用锁只是为了保护结构的特定条目的读取和更新,除此之外别无其他。

替代设计

在您从未从字典中删除队列的情况下,ConcurrentDictionary> 结构可能是一个不错的选择。否则,仍然存在发生竞争条件的空间。您应该专门使用 GetOrAdd 方法以原子方式在字典中获取或添加队列,并且在对其执行任何操作(读取或写入)之前始终使用队列本身作为储物柜

Queue<PacketModel> queue = _dicAllPackets
    .GetOrAdd(serialNumber, _ => new Queue<PacketModel>());
lock (queue)
{
    queue.Enqueue(model);
}

ConcurrentDictionary> 也是可能的,因为在这种情况下,ConcurrentDictionary 的值是不可变的,并且您不需要锁定任何东西。您需要始终使用 AddOrUpdate 方法,以便通过一次调用更新字典,作为原子操作。

_dicAllPackets.AddOrUpdate
(
    serialNumber,
    key => ImmutableQueue.Create<PacketModel>(model),
    (key, queue) => queue.Enqueue(model)
);

updateValueFactory 委托内的 queue.Enqueue(model) 调用不会改变队列。相反,它会创建一个新的 ImmutableQueue< PacketModel> 并丢弃前一个。一般来说,不可变集合的效率不是很高。但是,如果您的目标是最大限度地减少线程之间的争用,但代价是增加每个线程必须完成的工作,那么您可能会发现它们很有用。

No, it's not a good idea to use a ConcurrentDictionary with nested ConcurrentQueues as values. It's impossible to update atomically this structure. Take this for example:

if (!_dicAllPackets.ContainsKey(serialNumber))
    _dicAllPackets.TryAdd(serialNumber, new ConcurrentQueue<PacketModel> { });
else
    _dicAllPackets[serialNumber].Enqueue(model);

This little piece of code is riddled with race conditions. A thread that is running this code can be intercepted by another thread at any point between the ContainsKey, TryAdd, the [] indexer and the Enqueue invocations, altering the state of the structure, and invalidating the conditions on which the correctness of the current thread's work is based.

A ConcurrentDictionary is a good idea when you have a simple Dictionary that contains immutable values, you want to use it concurrently, and using a lock around each access could potentially create significant contention. You can read more about this here: When should I use ConcurrentDictionary and Dictionary?

My suggestion is to switch to a simple Dictionary<string, Queue<PacketModel>>, and synchronize it with a lock. If you are careful and you avoid doing anything irrelevant while holding the lock, the lock will be released so quickly that rarely other threads will be blocked by it. Use the lock just to protect the reading and updating of a specific entry of the structure, and nothing else.

Alternative designs

A ConcurrentDictionary<string, Queue<PacketModel>> structure might be a good option, under the condition that you never removed queues from the dictionary. Otherwise there is still space for race conditions to occur. You should use exclusively the GetOrAdd method to get or add atomically a queue in the dictionary, and also use always the queue itself as a locker before doing anything with it (either reading or writing):

Queue<PacketModel> queue = _dicAllPackets
    .GetOrAdd(serialNumber, _ => new Queue<PacketModel>());
lock (queue)
{
    queue.Enqueue(model);
}

Using a ConcurrentDictionary<string, ImmutableQueue<PacketModel>> is also possible because in this case the value of the ConcurrentDictionary is immutable, and you won't need to lock anything. You'll need to use always the AddOrUpdate method, in order to update the dictionary with a single call, as an atomic operation.

_dicAllPackets.AddOrUpdate
(
    serialNumber,
    key => ImmutableQueue.Create<PacketModel>(model),
    (key, queue) => queue.Enqueue(model)
);

The queue.Enqueue(model) call inside the updateValueFactory delegate does not mutate the queue. Instead it creates a new ImmutableQueue<PacketModel> and discards the previous one. The immutable collections are not very efficient in general. But if your goal is to minimize the contention between threads, at the cost of increasing the work that each thread has to do, then you might find them useful.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文