ConcurrentDictionary 中的 ConcurrentQueue 重复错误
我有一个线程处理每 10 秒接收的消息,并让另一个线程每分钟将这些消息写入数据库。
每条消息都有一个不同的发件人,在我的例子中名为 serialNumber
。
因此,我创建了一个如下所示的 ConcurrentDictionary。
public ConcurrentDictionary<string, ConcurrentQueue<PacketModel>> _dicAllPackets;
字典的键是serialNumber
,值为1分钟消息的集合。 我想要收集一分钟数据的原因是每分钟一次而不是每 10 秒一次去数据库,这样我可以将过程减少 1/6 倍。
public class ShotManager
{
private const int SLEEP_THREAD_FOR_FILE_LIST_DB_SHOOTER = 25000;
private bool ACTIVE_FILE_DB_SHOOT_THREAD = false;
private List<Devices> _devices = new List<Devices>();
public ConcurrentDictionary<string, ConcurrentQueue<PacketModel>> _dicAllPackets;
public ShotManager()
{
ACTIVE_FILE_DB_SHOOT_THREAD = Utility.GetAppSettings("AppConfig", "0", "ACTIVE_LIST_DB_SHOOT") == "1";
init();
}
private void init()
{
using (iotemplaridbContext dbContext = new iotemplaridbContext())
_devices = (from d in dbContext.Devices select d).ToList();
if (_dicAllPackets is null)
_dicAllPackets = new ConcurrentDictionary<string, ConcurrentQueue<PacketModel>>();
foreach (var device in _devices)
{
if(!_dicAllPackets.ContainsKey(device.SerialNumber))
_dicAllPackets.TryAdd(device.SerialNumber, new ConcurrentQueue<PacketModel> { });
}
}
public void Spinner()
{
while (ACTIVE_FILE_DB_SHOOT_THREAD)
{
try
{
Parallel.ForEach(_dicAllPackets, devicePacket =>
{
Thread.Sleep(100);
readAndShot(devicePacket);
});
Thread.Sleep(SLEEP_THREAD_FOR_FILE_LIST_DB_SHOOTER);
//init();
}
catch (Exception ex)
{
//init();
tLogger.EXC("Spinner exception for write...", ex);
}
}
}
public void EnqueueObjectToQueue(string serialNumber, PacketModel model)
{
if (_dicAllPackets != null)
{
if (!_dicAllPackets.ContainsKey(serialNumber))
_dicAllPackets.TryAdd(serialNumber, new ConcurrentQueue<PacketModel> { });
else
_dicAllPackets[serialNumber].Enqueue(model);
}
}
private void readAndShot(KeyValuePair<string, ConcurrentQueue<PacketModel>> keyValuePair)
{
StringBuilder sb = new StringBuilder();
if (keyValuePair.Value.Count() <= 0)
{
return;
}
sb.AppendLine($"INSERT INTO ......) VALUES(");
//the reason why I don't use while(TryDequeue(out ..)){..} is there's constantly enqueue to this dictionary, so the thread will be occupied with a single device for so long
for (int i = 0; i < 10; i++)
{
keyValuePair.Value.TryDequeue(out PacketModel packet);
if (packet != null)
{
/*
*** do something and fill the sb...
*/
}
else
{
Console.WriteLine("No packet found! For Device: " + keyValuePair.Key);
break;
}
}
insertIntoDB(sb.ToString()[..(sb.Length - 5)] + ";");
}
}
EnqueueObjectToQueue
调用者来自不同的类,如下所示。
private void packetToDictionary(string serialNumber, string jsonPacket, string messageTimeStamp)
{
PacketModel model = new PacketModel {
MachineData = jsonPacket,
DataInsertedAt = messageTimeStamp
};
_shotManager.EnqueueObjectToQueue(serialNumber, model);
}
我如何调用上述函数是来自处理函数本身。
private void messageReceiveHandler(object sender, MessageReceviedEventArgs e){
//do something...parse from e and call the func
string jsonPacket = ""; //something parsed from e
string serialNumber = ""; //something parsed from e
string message_timestamp = DateTime.Now().ToString("yyyy-MM-dd HH:mm:ss");
ThreadPool.QueueUserWorkItem(state => packetToDictionary(serialNumber, str, message_timestamp));
}
问题是有时某些数据包会在错误的 serialNumber
下排队或重复自身(重复条目)。
像这样在 ConcurrentDictionary 中使用 ConcurrentQueue 是否聪明?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
不,使用带有嵌套
ConcurrentQueue
作为值的ConcurrentDictionary
并不是一个好主意。不可能以原子方式更新此结构。以此为例:这段小代码充满了竞争条件。运行此代码的线程可以在
ContainsKey
、TryAdd
、[]
索引器和ContainsKey
、TryAdd
、[]
索引器和 < code>Enqueue 调用,更改结构的状态,并使当前线程工作正确性所依据的条件无效。当您有一个包含不可变值的简单字典,并且想要并发使用它并使用
锁
时,ConcurrentDictionary
是个好主意每次访问都可能会产生严重的争用。您可以在此处阅读有关此内容的更多信息:我应该何时使用 ConcurrentDictionary 和 Dictionary?我的建议是切换到简单的
Dictionary>
,并将其与锁定
。如果您小心并避免在持有锁时做任何不相关的事情,那么锁将很快被释放,以至于其他线程很少会被它阻塞。使用锁只是为了保护结构的特定条目的读取和更新,除此之外别无其他。替代设计
在您从未从字典中删除队列的情况下,
ConcurrentDictionary>
结构可能是一个不错的选择。否则,仍然存在发生竞争条件的空间。您应该专门使用 GetOrAdd 方法以原子方式在字典中获取或添加队列,并且在对其执行任何操作(读取或写入)之前始终使用队列本身作为储物柜:
ConcurrentDictionary>
也是可能的,因为在这种情况下,ConcurrentDictionary
的值是不可变的,并且您不需要锁定任何东西。您需要始终使用
AddOrUpdate
方法,以便通过一次调用更新字典,作为原子操作。updateValueFactory
委托内的queue.Enqueue(model)
调用不会改变队列。相反,它会创建一个新的ImmutableQueue< PacketModel>
并丢弃前一个。一般来说,不可变集合的效率不是很高。但是,如果您的目标是最大限度地减少线程之间的争用,但代价是增加每个线程必须完成的工作,那么您可能会发现它们很有用。No, it's not a good idea to use a
ConcurrentDictionary
with nestedConcurrentQueue
s as values. It's impossible to update atomically this structure. Take this for example:This little piece of code is riddled with race conditions. A thread that is running this code can be intercepted by another thread at any point between the
ContainsKey
,TryAdd
, the[]
indexer and theEnqueue
invocations, altering the state of the structure, and invalidating the conditions on which the correctness of the current thread's work is based.A
ConcurrentDictionary
is a good idea when you have a simpleDictionary
that contains immutable values, you want to use it concurrently, and using alock
around each access could potentially create significant contention. You can read more about this here: When should I use ConcurrentDictionary and Dictionary?My suggestion is to switch to a simple
Dictionary<string, Queue<PacketModel>>
, and synchronize it with alock
. If you are careful and you avoid doing anything irrelevant while holding the lock, the lock will be released so quickly that rarely other threads will be blocked by it. Use the lock just to protect the reading and updating of a specific entry of the structure, and nothing else.Alternative designs
A
ConcurrentDictionary<string, Queue<PacketModel>>
structure might be a good option, under the condition that you never removed queues from the dictionary. Otherwise there is still space for race conditions to occur. You should use exclusively the GetOrAdd method to get or add atomically a queue in the dictionary, and also use always thequeue
itself as a locker before doing anything with it (either reading or writing):Using a
ConcurrentDictionary<string, ImmutableQueue<PacketModel>>
is also possible because in this case the value of theConcurrentDictionary
is immutable, and you won't need tolock
anything. You'll need to use always theAddOrUpdate
method, in order to update the dictionary with a single call, as an atomic operation.The
queue.Enqueue(model)
call inside theupdateValueFactory
delegate does not mutate the queue. Instead it creates a newImmutableQueue<PacketModel>
and discards the previous one. The immutable collections are not very efficient in general. But if your goal is to minimize the contention between threads, at the cost of increasing the work that each thread has to do, then you might find them useful.