如何使用 ConcurrentQueue进行线程处理

发布于 2024-10-09 16:14:37 字数 1398 浏览 0 评论 0原文

我试图找出使用队列的最佳方法是什么。我有一个返回数据表的进程。每个数据表依次与前一个数据表合并。存在一个问题,即太多记录无法保存到最终的 BulkCopy (OutOfMemory)。

因此,我决定立即处理每个传入的 DataTable。考虑 ConcurrentQueue...但我不明白 WriteQueuedData() 方法如何知道将表出列并将其写入数据库。

例如:

public class TableTransporter
{
    private ConcurrentQueue<DataTable> tableQueue = new ConcurrentQueue<DataTable>();

    public TableTransporter()
    {
        tableQueue.OnItemQueued += new EventHandler(WriteQueuedData);   // no events available
    }

    public void ExtractData()
    {
        DataTable table;

        // perform data extraction
        tableQueue.Enqueue(table);
    }

    private void WriteQueuedData(object sender, EventArgs e)
    {
        BulkCopy(e.Table);
    }
}

我的第一个问题是,除了我实际上没有任何要订阅的事件这一事实之外,如果我异步调用 ExtractData() 这就是我所需要的吗?其次,我是否缺少关于 ConcurrentQueue功能的方式以及需要某种形式的触发器来与排队对象异步工作的内容?

更新 我刚刚从 ConcurrentQueue 派生了一个具有 OnItemQueued 事件处理程序的类。那么:

new public void Enqueue (DataTable Table)
{
    base.Enqueue(Table);
    OnTableQueued(new TableQueuedEventArgs(Table));
}

public void OnTableQueued(TableQueuedEventArgs table)
{
    EventHandler<TableQueuedEventArgs> handler = TableQueued;

    if (handler != null)
    {
        handler(this, table);
    }
}

对此实施有任何疑问吗?

I am trying to figure out what the best way of working with a queue will be. I have a process that returns a DataTable. Each DataTable, in turn, is merged with the previous DataTable. There is one problem, too many records to hold until the final BulkCopy (OutOfMemory).

So, I have determined that I should process each incoming DataTable immediately. Thinking about the ConcurrentQueue<T>...but I don't see how the WriteQueuedData() method would know to dequeue a table and write it to the database.

For instance:

public class TableTransporter
{
    private ConcurrentQueue<DataTable> tableQueue = new ConcurrentQueue<DataTable>();

    public TableTransporter()
    {
        tableQueue.OnItemQueued += new EventHandler(WriteQueuedData);   // no events available
    }

    public void ExtractData()
    {
        DataTable table;

        // perform data extraction
        tableQueue.Enqueue(table);
    }

    private void WriteQueuedData(object sender, EventArgs e)
    {
        BulkCopy(e.Table);
    }
}

My first question is, aside from the fact that I don't actually have any events to subscribe to, if I call ExtractData() asynchronously will this be all that I need? Second, is there something I'm missing about the way ConcurrentQueue<T> functions and needing some form of trigger to work asynchronously with the queued objects?

Update
I have just derived a class from ConcurrentQueue<T> that has an OnItemQueued event handler. Then:

new public void Enqueue (DataTable Table)
{
    base.Enqueue(Table);
    OnTableQueued(new TableQueuedEventArgs(Table));
}

public void OnTableQueued(TableQueuedEventArgs table)
{
    EventHandler<TableQueuedEventArgs> handler = TableQueued;

    if (handler != null)
    {
        handler(this, table);
    }
}

Any concerns about this implementation?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

感悟人生的甜 2024-10-16 16:14:37

根据我对这个问题的理解,你错过了一些事情。

并发队列是一种数据结构,旨在接受多个线程对队列的读写,而无需显式锁定该数据结构。 (所有这些爵士乐都是在幕后处理的,或者集合是以不需要锁定的方式实现的。)

考虑到这一点,看起来您尝试使用的模式是“生产者/消费者”。首先,您有一些任务生成工作(并将项目添加到队列中)。其次,您还有第二个任务:消耗队列中的内容(以及使项目出列)。

所以你实际上需要两个线程:一个添加项目,第二个删除项目。由于您使用的是并发集合,因此可以使用多个线程添加项目和多个线程删除项目。但显然,并发队列上的争用越多,就越快成为瓶颈。

From my understanding of the problem, you are missing a few things.

The concurrent queue is a data structure designed to accept multiple threads reading and writing to the queue without you needing to explicitly lock the data structure. (All that jazz is taken care of behind the scenes, or the collection is implemented in such a way that it doesn't need to take a lock.)

With that in mind, it looks like the pattern you are trying to use is the "Produce/Consumer". First, you have some tasks producing work (and adding items to the queue). And second you have a second task Consuming things from the queue (and dequeing items).

So really you want two threads: one adding items and a second removing items. Because you are using a concurrent collection, you can have multiple threads adding items and multiple threads removing items. But obviously the more contention you have on the concurrent queue the quicker that will become the bottleneck.

素染倾城色 2024-10-16 16:14:37

我认为 ConcurrentQueue 仅在极少数情况下有用。它的主要优点是无锁。然而,通常生产者线程必须以某种方式通知消费者线程有数据可供处理。线程之间的这种信号需要锁,并且否定了使用 ConcurrentQueue 的好处。同步线程的最快方法是使用 Monitor.Pulse(),它仅在锁内工作。所有其他同步工具甚至更慢。

当然,消费者可以不断地检查队列中是否有东西,这种方法无需锁即可工作,但会极大地浪费处理器资源。如果消费者在检查之间等待的话会更好一些。

写入队列时引发线程是一个非常糟糕的主意。使用 ConcurrentQueue 节省的 1 微秒时间可能会被执行 eventhandler 完全浪费,这可能会花费 1000 倍的时间。

尽管我可以想象 ConcurrentQueue 的一个用例:当生产者比消费者更快时,一旦队列为空,整个事情就会停止。在这种情况下,消费者可以避免空闲等待循环。

如果所有处理都是在事件处理程序或异步调用中完成,那么问题是为什么仍然需要队列?最好将数据直接传递给处理程序,根本不使用队列。

请注意,ConcurrentQueue 的实现对于允许并发来说相当复杂。在大多数情况下,最好使用普通的Queue并锁定对队列的每个访问。由于队列访问只需要微秒,因此两个线程在同一微秒内访问队列的可能性极小,并且几乎不会因为锁定而出现任何延迟。使用带有锁定的普通Queue<>通常会比ConcurrentQueue执行更快的代码。

I think ConcurrentQueue is useful only in very few cases. Its main advantage is that it is lock free. However, usually the producer thread(s) have to inform the consumer thread(s) somehow that there is data available to process. This signalling between threads needs locks and negates the benefit of using ConcurrentQueue. The fastest way to synchronize threads is using Monitor.Pulse(), which works only within a lock. All other synchronization tools are even slower.

Of course, the consumer can just continuously check if there is something in the queue, which works without locks, but is a huge waste of processor resources. A little bit better is if the consumer waits between checking.

Raising a thread when writing to the queue is a very bad idea. Using ConcurrentQueue to save maybe 1 microsecond will be completely wasted by executing the eventhandler, which might take 1000 times longer.

Although I could imagine one use case for ConcurrentQueue: When the producers are faster than the consumer and the whole thing stops once the queue is empty. In this case the consumer can avoid an idle wait loop.

If all the processing is done in an event handler or an async call, the question is why still a queue is needed? Better pass the data directly to the handler and don't use a queue at all.

Please note that the implementation of ConcurrentQueue is rather complicated to allow concurrency. In most cases, better use a normal Queue<> and lock every access to the queue. Since the queue access needs only microseconds, it is extremely unlikely that 2 threads access the queue in the same microsecond and there will be hardly ever any delay because of locking. Using a normal Queue<> with locking will often result in faster code execution than ConcurrentQueue.

许你一世情深 2024-10-16 16:14:37

这是我想出的完整解决方案:

public class TableTransporter
{
    private static int _indexer;

    private CustomQueue tableQueue = new CustomQueue();
    private Func<DataTable, String> RunPostProcess;
    private string filename;

    public TableTransporter()
    {
        RunPostProcess = new Func<DataTable, String>(SerializeTable);
        tableQueue.TableQueued += new EventHandler<TableQueuedEventArgs>(tableQueue_TableQueued);
    }

    void tableQueue_TableQueued(object sender, TableQueuedEventArgs e)
    {
        //  do something with table
        //  I can't figure out is how to pass custom object in 3rd parameter
        RunPostProcess.BeginInvoke(e.Table,new AsyncCallback(PostComplete), filename);
    }

    public void ExtractData()
    {
        // perform data extraction
        tableQueue.Enqueue(MakeTable());
        Console.WriteLine("Table count [{0}]", tableQueue.Count);
    }

    private DataTable MakeTable()
    { return new DataTable(String.Format("Table{0}", _indexer++)); }

    private string SerializeTable(DataTable Table)
    {
        string file = Table.TableName + ".xml";

        DataSet dataSet = new DataSet(Table.TableName);

        dataSet.Tables.Add(Table);

        Console.WriteLine("[{0}]Writing {1}", Thread.CurrentThread.ManagedThreadId, file);
        string xmlstream = String.Empty;

        using (MemoryStream memstream = new MemoryStream())
        {
            XmlSerializer xmlSerializer = new XmlSerializer(typeof(DataSet));
            XmlTextWriter xmlWriter = new XmlTextWriter(memstream, Encoding.UTF8);

            xmlSerializer.Serialize(xmlWriter, dataSet);
            xmlstream = UTF8ByteArrayToString(((MemoryStream)xmlWriter.BaseStream).ToArray());

            using (var fileStream = new FileStream(file, FileMode.Create))
                fileStream.Write(StringToUTF8ByteArray(xmlstream), 0, xmlstream.Length + 2);
        }
        filename = file;

        return file;
    }

    private void PostComplete(IAsyncResult iasResult)
    {
        string file = (string)iasResult.AsyncState;
        Console.WriteLine("[{0}]Completed: {1}", Thread.CurrentThread.ManagedThreadId, file);

        RunPostProcess.EndInvoke(iasResult);
    }

    public static String UTF8ByteArrayToString(Byte[] ArrBytes)
    { return new UTF8Encoding().GetString(ArrBytes); }

    public static Byte[] StringToUTF8ByteArray(String XmlString)
    { return new UTF8Encoding().GetBytes(XmlString); }
}

public sealed class CustomQueue : ConcurrentQueue<DataTable>
{
    public event EventHandler<TableQueuedEventArgs> TableQueued;

    public CustomQueue()
    { }
    public CustomQueue(IEnumerable<DataTable> TableCollection)
        : base(TableCollection)
    { }

    new public void Enqueue (DataTable Table)
    {
        base.Enqueue(Table);
        OnTableQueued(new TableQueuedEventArgs(Table));
    }

    public void OnTableQueued(TableQueuedEventArgs table)
    {
        EventHandler<TableQueuedEventArgs> handler = TableQueued;

        if (handler != null)
        {
            handler(this, table);
        }
    }
}

public class TableQueuedEventArgs : EventArgs
{
    #region Fields
    #endregion

    #region Init
    public TableQueuedEventArgs(DataTable Table)
    {this.Table = Table;}
    #endregion

    #region Functions
    #endregion

    #region Properties
    public DataTable Table
    {get;set;}
    #endregion
}

作为概念证明,它似乎工作得很好。我最多看到4个工作线程。

This is the complete solution for what I came up with:

public class TableTransporter
{
    private static int _indexer;

    private CustomQueue tableQueue = new CustomQueue();
    private Func<DataTable, String> RunPostProcess;
    private string filename;

    public TableTransporter()
    {
        RunPostProcess = new Func<DataTable, String>(SerializeTable);
        tableQueue.TableQueued += new EventHandler<TableQueuedEventArgs>(tableQueue_TableQueued);
    }

    void tableQueue_TableQueued(object sender, TableQueuedEventArgs e)
    {
        //  do something with table
        //  I can't figure out is how to pass custom object in 3rd parameter
        RunPostProcess.BeginInvoke(e.Table,new AsyncCallback(PostComplete), filename);
    }

    public void ExtractData()
    {
        // perform data extraction
        tableQueue.Enqueue(MakeTable());
        Console.WriteLine("Table count [{0}]", tableQueue.Count);
    }

    private DataTable MakeTable()
    { return new DataTable(String.Format("Table{0}", _indexer++)); }

    private string SerializeTable(DataTable Table)
    {
        string file = Table.TableName + ".xml";

        DataSet dataSet = new DataSet(Table.TableName);

        dataSet.Tables.Add(Table);

        Console.WriteLine("[{0}]Writing {1}", Thread.CurrentThread.ManagedThreadId, file);
        string xmlstream = String.Empty;

        using (MemoryStream memstream = new MemoryStream())
        {
            XmlSerializer xmlSerializer = new XmlSerializer(typeof(DataSet));
            XmlTextWriter xmlWriter = new XmlTextWriter(memstream, Encoding.UTF8);

            xmlSerializer.Serialize(xmlWriter, dataSet);
            xmlstream = UTF8ByteArrayToString(((MemoryStream)xmlWriter.BaseStream).ToArray());

            using (var fileStream = new FileStream(file, FileMode.Create))
                fileStream.Write(StringToUTF8ByteArray(xmlstream), 0, xmlstream.Length + 2);
        }
        filename = file;

        return file;
    }

    private void PostComplete(IAsyncResult iasResult)
    {
        string file = (string)iasResult.AsyncState;
        Console.WriteLine("[{0}]Completed: {1}", Thread.CurrentThread.ManagedThreadId, file);

        RunPostProcess.EndInvoke(iasResult);
    }

    public static String UTF8ByteArrayToString(Byte[] ArrBytes)
    { return new UTF8Encoding().GetString(ArrBytes); }

    public static Byte[] StringToUTF8ByteArray(String XmlString)
    { return new UTF8Encoding().GetBytes(XmlString); }
}

public sealed class CustomQueue : ConcurrentQueue<DataTable>
{
    public event EventHandler<TableQueuedEventArgs> TableQueued;

    public CustomQueue()
    { }
    public CustomQueue(IEnumerable<DataTable> TableCollection)
        : base(TableCollection)
    { }

    new public void Enqueue (DataTable Table)
    {
        base.Enqueue(Table);
        OnTableQueued(new TableQueuedEventArgs(Table));
    }

    public void OnTableQueued(TableQueuedEventArgs table)
    {
        EventHandler<TableQueuedEventArgs> handler = TableQueued;

        if (handler != null)
        {
            handler(this, table);
        }
    }
}

public class TableQueuedEventArgs : EventArgs
{
    #region Fields
    #endregion

    #region Init
    public TableQueuedEventArgs(DataTable Table)
    {this.Table = Table;}
    #endregion

    #region Functions
    #endregion

    #region Properties
    public DataTable Table
    {get;set;}
    #endregion
}

As proof of concept, it seems to work pretty well. At most I saw 4 worker threads.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文