您将如何实现这个“WorkerChain”? .NET 中的功能?

发布于 2024-08-29 18:20:54 字数 2263 浏览 3 评论 0原文

编辑:我意识到为时已晚(?),我在第一次更新此问题时发布的所有代码对于大多数读者来说太多了。我实际上已经继续了 为任何有兴趣阅读的人写了一篇关于此主题的博客文章

与此同时,我保留了原来的问题,以简要介绍我想要解决的问题。

我还要指出的是,到目前为止,我(在我的博客上)发布的代码已经很好地经受住了测试。但我仍然对人们愿意向我提供的有关其清洁/稳健/性能*的任何和所有反馈感兴趣。

*我喜欢这个词并不真正意味着我们的想法,但我们无论如何,开发人员一直在使用它。


原始问题

抱歉,问题标题含糊不清——不知道如何简洁地概括我在下面提出的问题。 (如果具有编辑权限的人可以想到更具描述性的标题,请随意更改它。)

我需要的行为是这样的。我设想一个工作类在其构造函数中接受单个委托任务(为了简单起见,我将使其不可变——实例化后不能添加更多任务)。我将此任务称为 T。该类应该有一个简单的方法,例如 GetToWork,它将表现出以下行为:

  1. 如果工作线程当前没有运行 T,那么它现在就开始这样做。
  2. 如果工作线程当前正在运行 T,那么一旦完成,它将立即再次启动 T
  3. 当工作线程运行T时,GetToWork可以被调用任意多次;简单的规则是,在 T 的任何执行期间,如果 GetToWork 被调用至少一次T 将完成后再次运行(然后,如果在 T 运行时调用 GetToWork会再次重复运行,等等)。

现在,使用布尔开关就非常简单了。但这个类需要是线程安全的,我的意思是,上面的步骤 1 和 2 需要包含原子操作(至少我认为是这样)。

还增加了一层复杂性。我需要一个“工人链”类,它将由许多连接在一起的工人组成。一旦第一个工作程序完成,它实际上会在其之后的工作程序上调用 GetToWork ;同时,如果调用了它自己的GetToWork,它也会自行重新启动。从逻辑上讲,在上调用GetToWork本质上与在中的第一个worker上调用GetToWork相同(我完全希望该连锁店的工作人员不被公开访问)。

想象这个假设的“工人链”将如何表现的一种方法是将其与接力赛中的一支球队进行比较。假设有四个跑步者,从 W1W4,并将该链称为 C。如果我调用C.StartWork(),应该发生的事情是这样的:

  1. 如果W1位于他的起点(即什么也不做),他将开始朝跑>W2
  2. 如果W1已经跑向W2(即执行他的任务),那么一旦他到达W2,他将向 W2 发出开始的信号,立即返回到其起点,并且由于 StartWork 已被调用,因此再次开始向 W2 运行。
  3. W1到达W2的起点时,他会立即返回到自己的起点。
    1. 如果 W2 只是坐在那里,他会立即开始向 W3 跑去。
    2. 如果 W2 已经开始朝 W3 方向奔跑,那么 W2 在到达 W3 后就会再次出发并回到了起点。

上面的内容可能有点复杂并且写得不好。但希望您能了解基本概念。显然,这些工作人员将在自己的线程上运行。

另外,我想这个功能可能已经存在于某处?如果是这样,一定让我知道!

EDIT: It kind of occurred to me too late (?) that all the code I posted in my first update to this question was way too much for most readers. I've actually gone ahead and written a blog post about this topic for anyone who cares to read it.

In the meantime, I've left the original question in place, to give a brief glimpse at the problem I'd like to solve.

I'll also just note that the code I have posted (on my blog) has, thus far, stood up pretty well to testing. But I'm still interested in any and all feedback people are willing to give me on how clean/robust/performant* it is.

*I love how that word doesn't really mean what we think, but we developers use it all the time anyway.


Original Question

Sorry for the vague question title -- not sure how to encapsulate what I'm asking below succinctly. (If someone with editing privileges can think of a more descriptive title, feel free to change it.)

The behavior I need is this. I am envisioning a worker class that accepts a single delegate task in its constructor (for simplicity, I would make it immutable -- no more tasks can be added after instantiation). I'll call this task T. The class should have a simple method, something like GetToWork, that will exhibit this behavior:

  1. If the worker is not currently running T, then it will start doing so right now.
  2. If the worker is currently running T, then once it is finished, it will start T again immediately.
  3. GetToWork can be called any number of times while the worker is running T; the simple rule is that, during any execution of T, if GetToWork was called at least once, T will run again upon completion (and then if GetToWork is called while T is running that time, it will repeat itself again, etc.).

Now, this is pretty straightforward with a boolean switch. But this class needs to be thread-safe, by which I mean, steps 1 and 2 above need to comprise atomic operations (at least I think they do).

There is an added layer of complexity. I have need of a "worker chain" class that will consist of many of these workers linked together. As soon as the first worker completes, it essentially calls GetToWork on the worker after it; meanwhile, if its own GetToWork has been called, it restarts itself as well. Logically calling GetToWork on the chain is essentially the same as calling GetToWork on the first worker in the chain (I would fully intend that the chain's workers not be publicly accessible).

One way to imagine how this hypothetical "worker chain" would behave is by comparing it to a team in a relay race. Suppose there are four runners, W1 through W4, and let the chain be called C. If I call C.StartWork(), what should happen is this:

  1. If W1 is at his starting point (i.e., doing nothing), he will start running towards W2.
  2. If W1 is already running towards W2 (i.e., executing his task), then once he reaches W2, he will signal to W2 to get started, immediately return to his starting point and, since StartWork has been called, start running towards W2 again.
  3. When W1 reaches W2's starting point, he'll immediately return to his own starting point.
    1. If W2 is just sitting around, he'll start running immediately towards W3.
    2. If W2 is already off running towards W3, then W2 will simply go again once he's reached W3 and returned to his starting point.

The above is probably a little convoluted and written out poorly. But hopefully you get the basic idea. Obviously, these workers will be running on their own threads.

Also, I guess it's possible this functionality already exists somewhere? If that's the case, definitely let me know!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

橘亓 2024-09-05 18:20:54

使用信号量。每个工作线程都是一个具有以下代码(伪代码)的线程:

WHILE(TRUE)
    WAIT_FOR_SEMAPHORE(WORKER_ID) //The semaphore for the current worker
    RESET_SEMAPHORE(WORKER_ID)
    /* DO WORK */
    POST_SEMAPHORE(NEXT_WORKER_ID) //The semaphore for the next worker
END

非零信号量意味着有人向当前线程发出信号以完成工作。在其输入行中获得非零信号量后,它会重置信号量(标记为无人发出信号),执行工作(同时可以再次发布信号量)并为下一个工作人员发布信号量。这个故事在下一个工人身上重演。

Use semaphores. Each worker is a thread with the following code (pseudocode):

WHILE(TRUE)
    WAIT_FOR_SEMAPHORE(WORKER_ID) //The semaphore for the current worker
    RESET_SEMAPHORE(WORKER_ID)
    /* DO WORK */
    POST_SEMAPHORE(NEXT_WORKER_ID) //The semaphore for the next worker
END

A non-zero semaphore means that someone signaled the current thread to do the work. After gets a non zero semaphore in its entry line, it resets the semaphore (mark as no one signaled), do the work (meanwhile the semaphore can be posted again) and post the semaphore for the next worker. The story repeats in the next worker(s).

尐籹人 2024-09-05 18:20:54

这是一个天真的实现,您可能会从中受益匪浅。

注意:

据我了解,标量类型(即控制执行的 bool 标志)具有原子分配,使它们在这种情况下具有线程安全性,正如您所需要/想要的那样。

还有涉及信号量和其他策略的更复杂的可能性,但如果简单有效的话......

using System;
using System.Threading;

namespace FlaggedWorkerChain
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            FlaggedChainedWorker innerWorker = new FlaggedChainedWorker("inner", () => Thread.Sleep(1000), null);
            FlaggedChainedWorker outerWorker = new FlaggedChainedWorker("outer", () => Thread.Sleep(500), innerWorker);

            Thread t = new Thread(outerWorker.GetToWork);
            t.Start();

            // flag outer to do work again
            outerWorker.GetToWork();

            Console.WriteLine("press the any key");
            Console.ReadKey();
        }
    }

    public sealed class FlaggedChainedWorker
    {
        private readonly string _id;
        private readonly FlaggedChainedWorker _innerWorker;
        private readonly Action _work;
        private bool _busy;
        private bool _flagged;

        public FlaggedChainedWorker(string id, Action work, FlaggedChainedWorker innerWorker)
        {
            _id = id;
            _work = work;
            _innerWorker = innerWorker;
        }

        public void GetToWork()
        {
            if (_busy)
            {
                _flagged = true;
                return;
            }

            do
            {
                _flagged = false;
                _busy = true;
                Console.WriteLine(String.Format("{0} begin", _id));

                _work.Invoke();

                if (_innerWorker != null)
                {
                    _innerWorker.GetToWork();
                }
                Console.WriteLine(String.Format("{0} end", _id));

                _busy = false;
            } while (_flagged);
        }
    }
}

A naive implementation that you may get some mileage from.

Note:

It is my understanding that scalar types, r.e. the bool flags controlling execution, have atomic assignment making them as thread safe as you would need/want in this scenario.

There are much more complex possibilities involving semaphores and other strategies, but if simple works....

using System;
using System.Threading;

namespace FlaggedWorkerChain
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            FlaggedChainedWorker innerWorker = new FlaggedChainedWorker("inner", () => Thread.Sleep(1000), null);
            FlaggedChainedWorker outerWorker = new FlaggedChainedWorker("outer", () => Thread.Sleep(500), innerWorker);

            Thread t = new Thread(outerWorker.GetToWork);
            t.Start();

            // flag outer to do work again
            outerWorker.GetToWork();

            Console.WriteLine("press the any key");
            Console.ReadKey();
        }
    }

    public sealed class FlaggedChainedWorker
    {
        private readonly string _id;
        private readonly FlaggedChainedWorker _innerWorker;
        private readonly Action _work;
        private bool _busy;
        private bool _flagged;

        public FlaggedChainedWorker(string id, Action work, FlaggedChainedWorker innerWorker)
        {
            _id = id;
            _work = work;
            _innerWorker = innerWorker;
        }

        public void GetToWork()
        {
            if (_busy)
            {
                _flagged = true;
                return;
            }

            do
            {
                _flagged = false;
                _busy = true;
                Console.WriteLine(String.Format("{0} begin", _id));

                _work.Invoke();

                if (_innerWorker != null)
                {
                    _innerWorker.GetToWork();
                }
                Console.WriteLine(String.Format("{0} end", _id));

                _busy = false;
            } while (_flagged);
        }
    }
}
沦落红尘 2024-09-05 18:20:54

在我看来,你把这个问题过于复杂化了。我以前写过这些“管道”类;您所需要的只是一个工作人员队列,每个工作人员都有一个等待句柄,该句柄在操作完成后会收到信号。

public class Pipeline : IDisposable
{
    private readonly IEnumerable<Stage> stages;

    public Pipeline(IEnumerable<Action> actions)
    {
        if (actions == null)
            throw new ArgumentNullException("actions");
        stages = actions.Select(a => new Stage(a)).ToList();
    }

    public Pipeline(params Action[] actions)
        : this(actions as IEnumerable<Action>)
    {
    }

    public void Dispose()
    {
        foreach (Stage stage in stages)
            stage.Dispose();
    }

    public void Start()
    {
        foreach (Stage currentStage in stages)
            currentStage.Execute();
    }

    class Stage : IDisposable
    {
        private readonly Action action;
        private readonly EventWaitHandle readyEvent;

        public Stage(Action action)
        {
            this.action = action;
            this.readyEvent = new AutoResetEvent(true);
        }

        public void Dispose()
        {
            readyEvent.Close();
        }

        public void Execute()
        {
            readyEvent.WaitOne();
            action();
            readyEvent.Set();
        }
    }
}

这是一个测试程序,您可以使用它来验证操作是否始终以正确的顺序执行,并且同一操作只能同时执行一个:

class Program
{
    static void Main(string[] args)
    {
        Action firstAction = GetTestAction(1);
        Action secondAction = GetTestAction(2);
        Action thirdAction = GetTestAction(3);
        Pipeline pipeline = new Pipeline(firstAction, secondAction, thirdAction);
        for (int i = 0; i < 10; i++)
        {
            ThreadPool.QueueUserWorkItem(s => pipeline.Start());
        }
    }

    static Action GetTestAction(int index)
    {
        return () =>
        {
            Console.WriteLine("Action started: {0}", index);
            Thread.Sleep(100);
            Console.WriteLine("Action finished: {0}", index);
        };
    }
}

简短、简单、完全线程安全。

如果由于某种原因您需要从链中的特定步骤开始工作,那么您只需为 Start 添加一个重载:

public void Start(int index)
{
    foreach (Stage currentStage in stages.Skip(index + 1))
        currentStage.Execute();
}

Edit

根据评论,我认为对内部 Stage 类进行一些小的更改应该足以获得您想要的行为。除了“就绪”事件之外,我们只需要添加一个“排队”事件即可。

    class Stage : IDisposable
    {
        private readonly Action action;
        private readonly EventWaitHandle readyEvent;
        private readonly EventWaitHandle queuedEvent;

        public Stage(Action action)
        {
            this.action = action;
            this.readyEvent = new AutoResetEvent(true);
            this.queuedEvent = new AutoResetEvent(true);
        }

        public void Dispose()
        {
            readyEvent.Close();
        }

        private bool CanExecute()
        {
            if (readyEvent.WaitOne(0, true))
                return true;
            if (!queuedEvent.WaitOne(0, true))
                return false;
            readyEvent.WaitOne();
            queuedEvent.Set();
            return true;
        }

        public bool Execute()
        {
            if (!CanExecute())
                return false;
            action();
            readyEvent.Set();
            return true;
        }
    }

如果某个阶段无法执行(即已经排队),还可以更改管道的 Start 方法来中断:

public void Start(int index)
{
    foreach (Stage currentStage in stages.Skip(index + 1))
        if (!currentStage.Execute())
            break;
}

这里的概念也非常简单:

  • 阶段首先尝试立即获取就绪状态。如果成功,则开始运行。
  • 如果它无法获取就绪状态(即任务已经在运行),那么它会尝试获取排队状态。
    • 如果它获取了排队状态,那么它会等待就绪状态变为可用,然后释放排队状态。
    • 如果它也无法获取排队状态,则会放弃。

我再次阅读了您的问题和评论,我很确定这正是您想要做的,并且在安全性、吞吐量和限制之间提供了最佳权衡。

由于ThreadPool有时可能需要一段时间才能响应,因此如果您想将测试程序中的延迟提高到1000而不是100,真正看到“跳过”的发生。

Seems to me that you're overcomplicating this. I've written these "pipeline" classes before; all you need is a queue of workers each with a wait handle that gets signaled after the action is complete.

public class Pipeline : IDisposable
{
    private readonly IEnumerable<Stage> stages;

    public Pipeline(IEnumerable<Action> actions)
    {
        if (actions == null)
            throw new ArgumentNullException("actions");
        stages = actions.Select(a => new Stage(a)).ToList();
    }

    public Pipeline(params Action[] actions)
        : this(actions as IEnumerable<Action>)
    {
    }

    public void Dispose()
    {
        foreach (Stage stage in stages)
            stage.Dispose();
    }

    public void Start()
    {
        foreach (Stage currentStage in stages)
            currentStage.Execute();
    }

    class Stage : IDisposable
    {
        private readonly Action action;
        private readonly EventWaitHandle readyEvent;

        public Stage(Action action)
        {
            this.action = action;
            this.readyEvent = new AutoResetEvent(true);
        }

        public void Dispose()
        {
            readyEvent.Close();
        }

        public void Execute()
        {
            readyEvent.WaitOne();
            action();
            readyEvent.Set();
        }
    }
}

And here's a test program, which you can use to verify that actions always get executed in the correct order and only one of the same action can ever execute at once:

class Program
{
    static void Main(string[] args)
    {
        Action firstAction = GetTestAction(1);
        Action secondAction = GetTestAction(2);
        Action thirdAction = GetTestAction(3);
        Pipeline pipeline = new Pipeline(firstAction, secondAction, thirdAction);
        for (int i = 0; i < 10; i++)
        {
            ThreadPool.QueueUserWorkItem(s => pipeline.Start());
        }
    }

    static Action GetTestAction(int index)
    {
        return () =>
        {
            Console.WriteLine("Action started: {0}", index);
            Thread.Sleep(100);
            Console.WriteLine("Action finished: {0}", index);
        };
    }
}

Short, simple, completely thread-safe.

If for some reason you need to start working at a specific step in the chain instead, then you can just add an overload for Start:

public void Start(int index)
{
    foreach (Stage currentStage in stages.Skip(index + 1))
        currentStage.Execute();
}

Edit

Based on comments, I think a few minor changes to the inner Stage class should be enough to get the kind of behaviour you want. We just need to add a "queued" event in addition to the "ready" event.

    class Stage : IDisposable
    {
        private readonly Action action;
        private readonly EventWaitHandle readyEvent;
        private readonly EventWaitHandle queuedEvent;

        public Stage(Action action)
        {
            this.action = action;
            this.readyEvent = new AutoResetEvent(true);
            this.queuedEvent = new AutoResetEvent(true);
        }

        public void Dispose()
        {
            readyEvent.Close();
        }

        private bool CanExecute()
        {
            if (readyEvent.WaitOne(0, true))
                return true;
            if (!queuedEvent.WaitOne(0, true))
                return false;
            readyEvent.WaitOne();
            queuedEvent.Set();
            return true;
        }

        public bool Execute()
        {
            if (!CanExecute())
                return false;
            action();
            readyEvent.Set();
            return true;
        }
    }

Also change the pipeline's Start method to break if a stage can't execute (i.e. is already queued):

public void Start(int index)
{
    foreach (Stage currentStage in stages.Skip(index + 1))
        if (!currentStage.Execute())
            break;
}

The concept here is pretty simple, again:

  • A stage first tries to immediately acquire the ready state. If it succeeds, then it starts running.
  • If it fails to acquire the ready state (i.e. the task is already running), then it tries to acquire the queued state.
    • If it gets the queued state, then it waits for the ready state to become available and then releases the queued state.
    • If it can't get the queued state either, then it gives up.

I've read over your question and comments again and I'm pretty sure this is exactly what you're trying to do, and gives the best trade-off between safety, throughput, and throttling.

Because the ThreadPool can sometimes take a while to respond, you should up the delay in the test program to 1000 instead of 100 if you want to really see the "skips" happening.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文