多线程:限制并发线程数

发布于 2024-11-07 16:08:54 字数 1582 浏览 0 评论 0原文

我需要开发一个使用多线程的应用程序。

基本上,我有一个包含大约 200k 行的 DataTable。 我需要从每一行中取出一个字段,将其与网页进行比较, 然后将其从数据表中删除。

问题是,为这些页面提供服务的服务器对并发请求有限制。 所以我最多可以同时请求 3 页。

我想通过使用线程池来做到这一点, 我什至设法构建了一个简单的应用程序来做到这一点(锁定数据表) 但我无法限制并发线程(即使使用 SetMaxThreads ),它似乎只是忽略了限制。

有人有现成的东西可以做类似的事情吗? 我很想看看。

我尝试过使用信号量,但遇到了问题:

        static SemaphoreSlim _sem = new SemaphoreSlim(3);    // Capacity of 3
    static List<string> records = new List<string>();

    static void Main()
    {
        records.Add("aaa");
        records.Add("bbb");
        records.Add("ccc");
        records.Add("ddd");
        records.Add("eee");
        records.Add("fff");
        records.Add("ggg");
        records.Add("iii");
        records.Add("jjj");

        for (int i = 0; i < records.Count; i++ )
        {
            new Thread(ThreadJob).Start(records[i]);
        }

        Console.WriteLine(records.Count);
        Console.ReadLine();
    }

    static void ThreadJob(object id)
    {
        Console.WriteLine(id + " wants to enter");
        _sem.Wait();
        Console.WriteLine(id + " is in!");           // Only three threads
        //Thread.Sleep(1000 * (int)id);               // can be here at
        Console.WriteLine(id + " is leaving");       // a time.

        lock (records)
        {
            records.Remove((string)id);
        }

        _sem.Release();
    }

运行得很好,唯一的问题是

Console.WriteLine(records.count);

返回不同的结果。 即使我知道这种情况发生,因为并非所有线程都已完成(在删除所有记录之前,我正在调用records.count),我找不到如何等待所有线程完成。

I need to develop an app that is using multithreading.

Basicly, I have a DataTable that contains around 200k rows.
From each row, I need to take a field, compare it to a webpage,
and then remove it from the datatable.

The thing is, the server serving those pages has a limit on concurrent requests.
so at max I can ask for 3 pages at the same time.

I want to do this by using the threadpool,
I even managed building a simple app that does that ( locks the datatable )
but I couldn't limit the concurrent threads ( even with SetMaxThreads ) it seems like it just ignored the limit.

does anyone have something ready made that does something similar ?
I would love to see.

i have tried using semaphores, but got into problems:

        static SemaphoreSlim _sem = new SemaphoreSlim(3);    // Capacity of 3
    static List<string> records = new List<string>();

    static void Main()
    {
        records.Add("aaa");
        records.Add("bbb");
        records.Add("ccc");
        records.Add("ddd");
        records.Add("eee");
        records.Add("fff");
        records.Add("ggg");
        records.Add("iii");
        records.Add("jjj");

        for (int i = 0; i < records.Count; i++ )
        {
            new Thread(ThreadJob).Start(records[i]);
        }

        Console.WriteLine(records.Count);
        Console.ReadLine();
    }

    static void ThreadJob(object id)
    {
        Console.WriteLine(id + " wants to enter");
        _sem.Wait();
        Console.WriteLine(id + " is in!");           // Only three threads
        //Thread.Sleep(1000 * (int)id);               // can be here at
        Console.WriteLine(id + " is leaving");       // a time.

        lock (records)
        {
            records.Remove((string)id);
        }

        _sem.Release();
    }

this runs quite nicely, the only problem is,

Console.WriteLine(records.count);

returns diffrent results.
even due i understand that it happens since not all the threads have finished ( an i a m calling the records.count before all records have been removed) i couldnt find how to wait for all to finish.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

兮子 2024-11-14 16:08:54

要等待多个线程完成,您可以使用多个 EventWaitHandle,然后调用 WaitHandle.WaitAll 阻塞主线程,直到所有事件都发出信号:

// we need to keep a list of synchronization events
var finishEvents = new List<EventWaitHandle>();

for (int i = 0; i < records.Count; i++ )
{
    // for each job, create an event and add it to the list
    var signal = new EventWaitHandle(false, EventResetMode.ManualReset);
    finishEvents.Add(signal);

    // we need to catch the id in a separate variable
    // for the closure to work as expected
    var id = records[i];

    var thread = new Thread(() =>
        {
            // do the job
            ThreadJob(id);

            // signal the main thread
            signal.Set();
        });
}

WaitHandle.WaitAll(finishEvents.ToArray());

由于大多数这些线程在大多数情况下都会挂起,因此最好在这种情况下使用 ThreadPool,因此您可以将 new Thread 替换为:

    ThreadPool.QueueUserWorkItem(s =>
    {
        ThreadJob(id);
        signal.Set();
    });

当您完成事件时,不要忘记处理它们:

foreach (var evt in finishEvents)
{
    evt.Dispose();
}

[编辑]

为了将所有内容放在一个地方,您的示例代码应如下所示:(

static Semaphore _sem = new Semaphore(3, 3);    // Capacity of 3
static List<string> _records = new List<string>(new string[] { "aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg", "hhh" });

static void Main()
{
    var finishEvents = new List<EventWaitHandle>();

    for (int i = 0; i < _records.Count; i++)
    {
        var signal = new EventWaitHandle(false, EventResetMode.ManualReset);
        finishEvents.Add(signal);

        var id = _records[i];
        var t = new Thread(() =>
        {
            ThreadJob(id);
            signal.Set();
        });

        t.Start();
    }

    WaitHandle.WaitAll(finishEvents.ToArray());

    Console.WriteLine(_records.Count);
    Console.ReadLine();
}

static void ThreadJob(object id)
{
    Console.WriteLine(id + " wants to enter");
    _sem.WaitOne();

    Console.WriteLine(id + " is in!");
    Thread.Sleep(1000);
    Console.WriteLine(id + " is leaving");

    lock (_records)
    {
        _records.Remove((string)id);
    }

    _sem.Release();
}

请注意,我使用了 Semaphore 而不是 SemaphoreSlim,因为我不这台机器上没有 .NET 4,我想在更新答案之前测试代码)

To wait for multiple threads to finish, you can use multiple EventWaitHandle's and then call WaitHandle.WaitAll to block the main thread until all events are signalled:

// we need to keep a list of synchronization events
var finishEvents = new List<EventWaitHandle>();

for (int i = 0; i < records.Count; i++ )
{
    // for each job, create an event and add it to the list
    var signal = new EventWaitHandle(false, EventResetMode.ManualReset);
    finishEvents.Add(signal);

    // we need to catch the id in a separate variable
    // for the closure to work as expected
    var id = records[i];

    var thread = new Thread(() =>
        {
            // do the job
            ThreadJob(id);

            // signal the main thread
            signal.Set();
        });
}

WaitHandle.WaitAll(finishEvents.ToArray());

Since most of these threads would end up suspended most of the time, it would be better to use ThreadPool in this case, so you can replace new Thread with:

    ThreadPool.QueueUserWorkItem(s =>
    {
        ThreadJob(id);
        signal.Set();
    });

When you are done with the events, don't forget to Dispose them:

foreach (var evt in finishEvents)
{
    evt.Dispose();
}

[Edit]

To put it all in one place, here is what your example code should look like:

static Semaphore _sem = new Semaphore(3, 3);    // Capacity of 3
static List<string> _records = new List<string>(new string[] { "aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg", "hhh" });

static void Main()
{
    var finishEvents = new List<EventWaitHandle>();

    for (int i = 0; i < _records.Count; i++)
    {
        var signal = new EventWaitHandle(false, EventResetMode.ManualReset);
        finishEvents.Add(signal);

        var id = _records[i];
        var t = new Thread(() =>
        {
            ThreadJob(id);
            signal.Set();
        });

        t.Start();
    }

    WaitHandle.WaitAll(finishEvents.ToArray());

    Console.WriteLine(_records.Count);
    Console.ReadLine();
}

static void ThreadJob(object id)
{
    Console.WriteLine(id + " wants to enter");
    _sem.WaitOne();

    Console.WriteLine(id + " is in!");
    Thread.Sleep(1000);
    Console.WriteLine(id + " is leaving");

    lock (_records)
    {
        _records.Remove((string)id);
    }

    _sem.Release();
}

(note that I've used Semaphore instead of SemaphoreSlim because I don't have .NET 4 on this machine and I wanted to test the code before updating the answer)

清君侧 2024-11-14 16:08:54

为什么不使用并行扩展 - 这会让事情变得更容易。

不管怎样,你可能想看看信号量之类的东西。一两个月前我写了一篇关于这个主题的博客文章,您可能会觉得有用:https://colinmackay.scot/2011/03/30/using-semaphores-to-restrict-access-to-resources/

Why not use the Parallel Extensions - That would make things a lot easier.

Anyway, what you probably want to look at is something like Semaphores. I wrote a blog post on this subject a month or two back that you might find useful: https://colinmackay.scot/2011/03/30/using-semaphores-to-restrict-access-to-resources/

不…忘初心 2024-11-14 16:08:54

你可以使用
Semaphore(如果您使用的是 .net 3.5)

.net 4.0 中的 SemaphoreSlim代码>

you can use
Semaphore if you are under .net 3.5

or

SemaphoreSlim in .net 4.0

写下不归期 2024-11-14 16:08:54

首先,应该
Console.WriteLine(id + "要离开");
不是晚一点,在锁定之后、释放信号量之前吗?

至于实际等待所有线程完成,从长远来看,Groo 的答案看起来更好、更稳健,但作为此特定代码段的更快/更简单的解决方案,我认为您也可以只调用 .按顺序对您想要等待的所有线程进行 Join() 。

static List<Thread> ThreadList = new List<Thread>(); // To keep track of them

然后在启动线程时,将当前的新线程行替换为:

ThreadList.Add(new Thread(ThreadJob).Start(records[i]));

然后在 Console.WriteLine 之前:

foreach( Thread t in ThreadList )
{
    t.Join();
}

如果任何线程没有终止,并且如果您想知道哪个线程还没有终止,则这将锁定还没完,这个方法不行。

First, should
Console.WriteLine(id + " is leaving");
not be a bit later, after the lock and just before it releases the semaphore?

As to the actual waiting for all of the threads to finish, Groo's answer looks better and more robust in the long term, but as a quicker/simpler solution to this specific piece of code, I think you could also get away with just calling .Join() on all of the threads you want to wait for, in sequence.

static List<Thread> ThreadList = new List<Thread>(); // To keep track of them

then when starting the threads, replace the current new Thread line with:

ThreadList.Add(new Thread(ThreadJob).Start(records[i]));

and then just before the Console.WriteLine:

foreach( Thread t in ThreadList )
{
    t.Join();
}

This will lock up if any of the threads don't terminate though, and if you ever want to know -which- threads haven't finished, this method won't work.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文