Thread.Sleep 阻塞任务的并行执行

发布于 2024-12-06 12:03:10 字数 1514 浏览 0 评论 0原文

我正在调用一个工作方法,该方法调用数据库,然后迭代并产生用于并行处理的返回值。为了防止它破坏数据库,我在那里有一个 Thread.Sleep 来暂停对数据库的执行。然而,这似乎阻止了 Parallel.ForEach 中仍在发生的执行。实现这一目标以防止阻塞的最佳方法是什么?

private void ProcessWorkItems()
{
    _cancellation = new CancellationTokenSource();
    _cancellation.Token.Register(() => WorkItemRepository.ResetAbandonedWorkItems());

    Task.Factory.StartNew(() =>
        Parallel.ForEach(GetWorkItems().AsParallel().WithDegreeOfParallelism(10), workItem =>
        {
            var x = ItemFactory(workItem);
            x.doWork();
        }), _cancellation.Token);
}

private IEnumerable<IAnalysisServiceWorkItem> GetWorkItems()
{
    while (!_cancellation.IsCancellationRequested)
    {
        var workItems = WorkItemRepository.GetItemList(); //database call

        workItems.ForEach(item =>
        {
            item.QueueWorkItem(WorkItemRepository);
        });

        foreach (var item in workItems)
        {
            yield return item;
        }

        if (workItems.Count == 0)
        {
            Thread.Sleep(30000); //sleep this thread for 30 seconds if no work items.
        }
    }

    yield break;
}

编辑: 我更改了它以包含答案,但它仍然没有按照我的预期工作。我将 .AsParallel().WithDegreeOfParallelism(10) 添加到 GetWorkItems() 调用中。当我认为即使基本线程处于睡眠状态,并行也应该继续执行时,我的期望是否不正确?

例子: 我有 15 个项目,它会迭代并获取 10 个项目并启动它们。当每一项完成时,它会从 GetWorkItems 请求另一项,直到尝试请求第 16 项。此时,它应该停止尝试抓取更多项目,但应继续处理项目 11-15,直到完成为止。这就是并行应该如何工作吗?因为目前它还没有这样做。它当前正在做的是,当它完成 6 时,它会锁定仍在 Parallel.ForEach 中运行的后续 10 个。

I'm calling a worker method that calls to the database that then iterates and yield returns values for parallel processing. To prevent it from hammering the database, I have a Thread.Sleep in there to pause the execution to the DB. However, this appears to be blocking executions that are still occurring in the Parallel.ForEach. What is the best way to achieve this to prevent blocking?

private void ProcessWorkItems()
{
    _cancellation = new CancellationTokenSource();
    _cancellation.Token.Register(() => WorkItemRepository.ResetAbandonedWorkItems());

    Task.Factory.StartNew(() =>
        Parallel.ForEach(GetWorkItems().AsParallel().WithDegreeOfParallelism(10), workItem =>
        {
            var x = ItemFactory(workItem);
            x.doWork();
        }), _cancellation.Token);
}

private IEnumerable<IAnalysisServiceWorkItem> GetWorkItems()
{
    while (!_cancellation.IsCancellationRequested)
    {
        var workItems = WorkItemRepository.GetItemList(); //database call

        workItems.ForEach(item =>
        {
            item.QueueWorkItem(WorkItemRepository);
        });

        foreach (var item in workItems)
        {
            yield return item;
        }

        if (workItems.Count == 0)
        {
            Thread.Sleep(30000); //sleep this thread for 30 seconds if no work items.
        }
    }

    yield break;
}

Edit:
I changed it to include the answer and it's still not working as I'm expecting. I added the .AsParallel().WithDegreeOfParallelism(10) to the GetWorkItems() call. Are my expectations incorrect when I think that Parallel should continue to execute even though the base thread is sleeping?

Example:
I have 15 items, it iterates and grabs 10 items and starts them. As each one finishes, it asks for another one from GetWorkItems until it tries to ask for a 16th item. At that point it should stop trying to grab more items but should continue processing items 11-15 until those are complete. Is that how parallel should be working? Because it's not currently doing that. What it's currently doing is when it completes 6, it locks the subsequent 10 still being run in the Parallel.ForEach.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

北笙凉宸 2024-12-13 12:03:11

You can use the .WithDegreeOfParallelism() extension method to force PLinq to run the tasks simultaneously. There's a good example in the Call Blocking or I/O Intensive section in th C# Threading Handbook

心是晴朗的。 2024-12-13 12:03:11

您可能与分区程序发生冲突。

因为您传递的是 IEnumerable,所以 Parallel.ForEach 将使用 Chunk Partitioner,它可以尝试一次从块中的枚举中获取一些元素。但是您的 IEnumerable.MoveNext 可以休眠,这会让事情变得不愉快。

您可以编写自己的分区程序,一次返回一个元素,但无论如何,我认为像 Jim Mischel 的建议这样的生产者/消费者方法会效果更好。

You may be falling foul of the Partitioner.

Because you are passing an IEnumerable, Parallel.ForEach will use a Chunk Partitioner which can try to grab a few elements at a time from the enumeration in a chunk. But your IEnumerable.MoveNext can sleep, which will upset things.

You could write your own Partitioner that returns one element at a time, but in any case, I think a producer/consumer approach such as Jim Mischel's suggestion will work better.

傲性难收 2024-12-13 12:03:11

你想通过睡眠来达到什么目的?据我所知,您正在努力避免冲击数据库调用。我不知道有更好的方法来做到这一点,但理想情况下,您的 GetItemList 调用将被阻塞,直到数据可供处理。

What are you trying to accomplish with the sleeping? From what I can tell, you're trying to avoid pounding database calls. I don't know of a better way to do that, but it seems like ideally your GetItemList call would be blocking until data was available to process.

ˉ厌 2024-12-13 12:03:10

我建议您创建一个工作项的 BlockingCollection (队列) ,以及一个每 30 秒调用数据库来填充它的计时器。类似于:

BlockingCollection<WorkItem> WorkItems = new BlockingCollection<WorkItem>();

在初始化时:

System.Threading.Timer WorkItemTimer = new Timer((s) =>
    {
        var items = WorkItemRepository.GetItemList(); //database call
        foreach (var item in items)
        {
            WorkItems.Add(item);
        }
    }, null, 30000, 30000);

每 30 秒就会查询数据库中的项目。

为了安排要处理的工作项,您有多种不同的解决方案。最接近您所拥有的将是:

WorkItem item;

while (WorkItems.TryTake(out item, Timeout.Infinite, _cancellation))
{
    Task.Factory.StartNew((s) =>
        {
            var myItem = (WorkItem)s;
            // process here
        }, item);
}

这消除了任何线程中的阻塞,并让 TPL 决定如何最好地分配并行任务。

编辑:

实际上,更接近您所拥有的是:

foreach (var item in WorkItems.GetConsumingEnumerable(_cancellation))
{
    // start task to process item
}

您也许可以使用:

Parallel.Foreach(WorkItems.GetConsumingEnumerable(_cancellation).AsParallel ...

我不知道这是否有效或效果如何。也许值得一试。 。 。

编辑结束

一般来说,我建议您将其视为生产者/消费者应用程序,生产者是定期查询数据库以获取新项目的线程。我的示例每 N(在本例中为 30)秒查询一次数据库,如果平均而言,您可以每 30 秒清空工作队列,那么该方法将会很好地工作。这将使从项目发布到数据库到获得结果的平均延迟不到一分钟。

您可以降低轮询频率(从而降低延迟),但这会导致更多的数据库流量。

您也可以更喜欢它。例如,如果您在 30 秒后轮询数据库并获得大量项目,那么您可能很快就会获得更多项目,并且您需要在 15 秒(或更短时间)内再次轮询。相反,如果您在 30 秒后轮询数据库但没有得到任何结果,那么您可能可以等待更长时间才能再次轮询。

您可以使用一次性计时器来设置这种自适应轮询。也就是说,在创建计时器时为最后一个参数指定 -1,这会导致计时器仅触发一次。您的计时器回调计算出下一次轮询之前需要等待多长时间,并调用 Timer.Change 以使用新值初始化计时器。

I would suggest that you create a BlockingCollection (a queue) of work items, and a timer that calls the database every 30 seconds to populate it. Something like:

BlockingCollection<WorkItem> WorkItems = new BlockingCollection<WorkItem>();

And on initialization:

System.Threading.Timer WorkItemTimer = new Timer((s) =>
    {
        var items = WorkItemRepository.GetItemList(); //database call
        foreach (var item in items)
        {
            WorkItems.Add(item);
        }
    }, null, 30000, 30000);

That will query the database for items every 30 seconds.

For scheduling the work items to be processed, you have a number of different solutions. The closest to what you have would be this:

WorkItem item;

while (WorkItems.TryTake(out item, Timeout.Infinite, _cancellation))
{
    Task.Factory.StartNew((s) =>
        {
            var myItem = (WorkItem)s;
            // process here
        }, item);
}

That eliminates blocking in any of the threads, and lets the TPL decide how best to allocate the parallel tasks.

EDIT:

Actually, closer to what you have is:

foreach (var item in WorkItems.GetConsumingEnumerable(_cancellation))
{
    // start task to process item
}

You might be able to use:

Parallel.Foreach(WorkItems.GetConsumingEnumerable(_cancellation).AsParallel ...

I don't know if that will work or how well. Might be worth a try . . .

END OF EDIT

In general, what I'm suggesting is that you treat this as a producer/consumer application, with the producer being the thread that queries the database periodically for new items. My example queries the database once every N (30 in this case) seconds, which will work well if, on average, you can empty your work queue every 30 seconds. That will give an average latency of less than a minute from the time an item is posted to the database until you have the results.

You can reduce the polling frequency (and thus the latency), but that will cause more database traffic.

You can get fancier with it, too. For example, if you poll the database after 30 seconds and you get a huge number of items, then it's likely that you'll be getting more soon, and you'll want to poll again in 15 seconds (or less). Conversely, if you poll the database after 30 seconds and get nothing, then you can probably wait longer before you poll again.

You can set up that kind of adaptive polling using a one-shot timer. That is, you specify -1 for the last parameter when you create the timer, which causes it to fire only once. Your timer callback figures out how long to wait before the next poll and calls Timer.Change to initialize the timer with the new value.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文