如何连续 QueueUserWorkItems 但不立即将它们全部排队?

发布于 2024-08-04 12:45:35 字数 2113 浏览 9 评论 0原文

我正在为网站开发一个多线程抓取器,根据不同的问题,我决定将 ThreadPool 与 QueueUserWorkItem() 一起使用。

如何连续对工作项目进行排队而不一次性将它们全部排队?我需要排队> 300k 个项目(每个用户 ID 一个),如果我循环将它们全部排队,我将耗尽内存。

所以,我想要的是:

// 1 = startUserID, 300000 = endUserID, 25 = MaxThreads  
Scraper webScraper = new Scraper(1, 300000, 25); 

webScraper.Start();  
// return immediately while webScraper runs in the background

在此期间,当线程可用时,webScraper 会不断添加所有 300000 个工作项。

这是我到目前为止所得到的:

public class Scraper
    {
        private int MaxUserID { get; set; }
        private int MaxThreads { get; set; }
        private static int CurrentUserID { get; set; }
        private bool Running { get; set; }
        private Parser StatsParser = new Parser();


        public Scraper()
            : this(0, Int32.MaxValue, 25)
        {
        }

        public Scraper(int CurrentUserID, int MaxUserID, int MaxThreads)
        {
            this.CurrentUserID = CurrentUserID;
            this.MaxUserID = MaxUserID;
            this.MaxThreads = MaxThreads;
            this.Running = false;

            ThreadPool.SetMaxThreads(MaxThreads, MaxThreads);
        }

        public void Start()
        {
            int availableThreads;

            // Need to start a new thread to spawn the new WorkItems so Start() will return right away?
            while (Running)
            {

                // if (!CurrentUserID >= MaxUserID)
                // {
                //     while (availableThreads > 0)
                //     {
                //         ThreadPool.QueueUserWorkItem(new WaitCallBack(Process));
                //     }
                // }
                // else
                // { Running = false; }
            }
        }

        public void Stop()
        {
            Running = false;
        }

        public static void process(object state)
        {
             var userID = Interlocked.Increment(ref CurrentUserID);
             ... Fetch Stats for userID
        }
    }

这是正确的方法吗?

任何人都可以为我指明正确的方向,以便在调用 Start() 后在后台处理工作项的创建,而不是立即创建所有工作项吗?

I'm working on a multi-threaded scraper for a website and as per a different question I've decided to use the ThreadPool with QueueUserWorkItem().

How can I continually Queue work items without queuing them all at once? I need to queue > 300k items (one for each userID) and if I loop to queue them all I'll run out of memory.

So, what I would like is:

// 1 = startUserID, 300000 = endUserID, 25 = MaxThreads  
Scraper webScraper = new Scraper(1, 300000, 25); 

webScraper.Start();  
// return immediately while webScraper runs in the background

During this time, webScraper is continuouslly adding all 300000 workItems as threads become available.

Here is what I have so far:

public class Scraper
    {
        private int MaxUserID { get; set; }
        private int MaxThreads { get; set; }
        private static int CurrentUserID { get; set; }
        private bool Running { get; set; }
        private Parser StatsParser = new Parser();


        public Scraper()
            : this(0, Int32.MaxValue, 25)
        {
        }

        public Scraper(int CurrentUserID, int MaxUserID, int MaxThreads)
        {
            this.CurrentUserID = CurrentUserID;
            this.MaxUserID = MaxUserID;
            this.MaxThreads = MaxThreads;
            this.Running = false;

            ThreadPool.SetMaxThreads(MaxThreads, MaxThreads);
        }

        public void Start()
        {
            int availableThreads;

            // Need to start a new thread to spawn the new WorkItems so Start() will return right away?
            while (Running)
            {

                // if (!CurrentUserID >= MaxUserID)
                // {
                //     while (availableThreads > 0)
                //     {
                //         ThreadPool.QueueUserWorkItem(new WaitCallBack(Process));
                //     }
                // }
                // else
                // { Running = false; }
            }
        }

        public void Stop()
        {
            Running = false;
        }

        public static void process(object state)
        {
             var userID = Interlocked.Increment(ref CurrentUserID);
             ... Fetch Stats for userID
        }
    }

Is this the right approach?

Can anyone point me in the right direction for handling the creation of my work items while in the background once Start() is called, and not creating all Work items at once?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

暗地喜欢 2024-08-11 12:45:35

如果从工作队列中窃取工作的工作项较少,是否可以更好地实现这一点?仅仅因为你有 300,000 件工作要做,并不意味着你需要 300,000 名工人来完成。显然,由于您只有几个核心,因此只有少数工作可以并行发生,那么为什么不将大量工作分配给更少的工作人员呢?

根据每项工作所花费的时间的恒定程度,您可以将其平均分配给每个工作人员,也可以拥有一个中央队列(您必须锁定该队列),并且每个工作人员都可以在工作用完时获取一些工作。

编辑:

Joe Duffy 似乎有一系列关于在这里编写工作窃取队列的文章: http://www.bluebytesoftware.com/blog/2008/08/12/BuildingACustomThreadPoolSeriesPart2AWorkStealingQueue.aspx。 .Net 4 的线程池看起来也会更加智能。但我认为对于这种情况你不需要特别复杂的东西。

Would this be better implemented with less Work Items that steal work from a queue of work? Just because you have 300,000 pieces of work to do it doesn't mean you need 300,000 workers to do it. Obviously as you only have a few cores, only a few of these pieces of work can be happening in parallel, so why not hand out chunks of work to much fewer workers?

Depending on how constant the time taken for each piece of work is, you can either split it all evenly across each worker or have a central queue (that you'll have to lock around) and each worker can grab some work as it runs out.

EDIT:

Joe Duffy seems to have a series about writing a Work Stealing Queue here: http://www.bluebytesoftware.com/blog/2008/08/12/BuildingACustomThreadPoolSeriesPart2AWorkStealingQueue.aspx. It also looks like .Net 4's Threadpool is going to be a bit smarter. But I don't think you need something particularly complex for this scenario.

再见回来 2024-08-11 12:45:35

我认为创建一个排队项目队列似乎不太正确,那么在 WorkItems 完成后再次让它们自己排队怎么样?

您的 Start 方法可以排队,例如 3 倍 MaxThreads 项(在您的示例中为 75),然后您的 Process 方法在完成时自行排队。这样,您的 Start 方法会快速返回,但会触发许多工作项,正如我所说,然后它们会自行触发:


    public class Scraper
    {
        private int MaxUserID { get; set; }
        private int MaxThreads { get; set; }
        private int currentUserID;
        private bool Running { get; set; }
        private Parser StatsParser = new Parser();

        private int Multiplier { get; set; }

        public Scraper()
            : this(0, Int32.MaxValue, 25)
        {
        }

        public Scraper(int currentUserID, int maxUserID, int maxThreads)
        {
            this.currentUserID = currentUserID;
            this.MaxUserID = maxUserID;
            this.MaxThreads = maxThreads;
            this.Running = false;

            ThreadPool.SetMaxThreads(maxThreads, maxThreads);
            Multiplier = 3;
        }

        public void Start()
        {
            Running = true;
            for (int i = 0; i < MaxThreads * Multiplier; i++)
            {
                ThreadPool.QueueUserWorkItem(Process);
            }
        }

        public void Stop()
        {
            Running = false;
        }

        public void Process(object state)
        {
            if (Running == false)
            {
                return;
            }
            if (currentUserID < MaxUserID)
            {
                Interlocked.Increment(ref currentUserID);
                //Parse stats for currentUserID
                ThreadPool.QueueUserWorkItem(Process);
            }
            else
            { Running = false; }
        }
    }

我确信为了安全起见,应该使用 Interlocked 设置 Running 标志。我已将乘数放入一个属性中,该属性可以传递给构造函数 - 我相当确定可以对其进行调整以调整性能,具体取决于解析这些统计数据所需的时间。

I think creating a queue of queued items doesn't seem quite right somehow, so how about making the WorkItems queue themselves again after they've finished?

Your Start method could queue up, say, 3 times MaxThreads items (75 in your example) and then your Process method queues itself when it's finished. That way your Start method returns quickly but fires off a number of work items, which as I say then fire themselves:


    public class Scraper
    {
        private int MaxUserID { get; set; }
        private int MaxThreads { get; set; }
        private int currentUserID;
        private bool Running { get; set; }
        private Parser StatsParser = new Parser();

        private int Multiplier { get; set; }

        public Scraper()
            : this(0, Int32.MaxValue, 25)
        {
        }

        public Scraper(int currentUserID, int maxUserID, int maxThreads)
        {
            this.currentUserID = currentUserID;
            this.MaxUserID = maxUserID;
            this.MaxThreads = maxThreads;
            this.Running = false;

            ThreadPool.SetMaxThreads(maxThreads, maxThreads);
            Multiplier = 3;
        }

        public void Start()
        {
            Running = true;
            for (int i = 0; i < MaxThreads * Multiplier; i++)
            {
                ThreadPool.QueueUserWorkItem(Process);
            }
        }

        public void Stop()
        {
            Running = false;
        }

        public void Process(object state)
        {
            if (Running == false)
            {
                return;
            }
            if (currentUserID < MaxUserID)
            {
                Interlocked.Increment(ref currentUserID);
                //Parse stats for currentUserID
                ThreadPool.QueueUserWorkItem(Process);
            }
            else
            { Running = false; }
        }
    }

I'm sure the Running flag should be being set using Interlocked for safety. I've made the multiplier into a property, which could be passed to the constructor - I'm fairly sure it could be adjusted to tweak performance, depending on how long those stats take to parse.

你的往事 2024-08-11 12:45:35

看起来您需要一个主流程控制类来管理正在启动的工作人员数量并保持队列已满。

然后,您可以使用两个队列:

  1. 一个用于保存需要抓取的所有项目,
  2. 第二个用于完成工作,

然后该主/总督对象将保持循环,直到队列 #1 中的所有项目都消失并且它将继续添加到队列中#2 当你有可用的周期时。

It looks like you need a Master process control class that governs the amount of workers that are firing off and keeps the Queue full.

You could work with two queues then:

  1. One to hold all the items you need to scrape
  2. Second to do the work

This Master/Governor object would then keep a loop until all your items from Queue #1 are gone and it would keep adding to Queue #2 when you have available cycles.

暗喜 2024-08-11 12:45:35

我绝对不会使用 ThreadPool.SetMaxThreads - 记住线程池在所有进程之间共享 - 设置最大线程数只会降低性能。线程池背后的整个想法是,您不需要指定诸如最大线程数之类的东西 - .Net 框架会计算出要分配的最佳线程数 - 您不需要这样做。

请注意,排队 300 000 个项目不会导致产生 300 000 个线程 - ThreadPool 类将为您管理线程数并根据需要重新使用线程。如果您只是担心这种方式会消耗太多资源,我建议您改进流程 - 也许创建一个“Spawner”类来运行 1000 个抓取实例?

I definitely won't use ThreadPool.SetMaxThreads - remember that the threadpool is shared between all processes - setting the maximum amount of threads would simply kill performance. The whole idea behind the threadpool is that you don't need to specify things like the maximum amount of threads - the .Net framework figures out the optimum amount of threads to allocate - you don't need to do it.

Note that queuing 300 000 items would not cause 300 000 threads to spawn - the ThreadPool class will manage the number of threads for you and re-use threads as necessary. If you are simply worried that too many resources will be consumed this way I would recommend that you refine your process - perhaps create a 'Spawner' class which in turn runs 1000 of the scraper instances?

童话里做英雄 2024-08-11 12:45:35

您可以使用不同的线程池。这是一个:http://www.codeplex.com/smartthreadpool
它允许您一次对所有物品进行排队。您可以指定要创建的最大线程数。假设您有 1000 个工作项并分配了 100 个线程。它将立即获取前 100 个项目并开始运行,而其余的则等待。一旦其中一个项目完成并且线程释放,下一个排队项目就会启动。它管理所有工作,但不会使线程和内存饱和。此外,它不使用 .net 线程池中的线程。

You can use a different thread pool. Here is one: http://www.codeplex.com/smartthreadpool
It allows you to queue up all your items at once. You can assign a max number of threads to create. Say you have 1000 work items and you assign 100 threads. It will immediately take the first 100 items and get them going while the rest wait. As soon as one of those items is done and a thread frees up, the next queued item is started. It manages all the work but won't saturate threads and memory. Also, it doesn't use threads from the .net thread pool.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文