使用 CCR 有序处理事件的有效方法是什么?

发布于 2024-10-26 14:09:33 字数 1406 浏览 7 评论 0原文

我正在尝试使用 CCR 迭代器作为一项任务的解决方案,该任务需要并行处理大量数据源,其中每个源的数据都需要按顺序处理。所有提要都不相互依赖,因此可以按提要并行进行按顺序处理。

下面是一个带有一个整数馈送的快速而肮脏的模型,它只是以大约 1.5K/秒的速率将整数推入端口,然后使用 CCR 迭代器将它们拉出以保持按顺序处理保证。

class Program
{
    static Dispatcher dispatcher = new Dispatcher();
    static DispatcherQueue dispatcherQueue = 
       new DispatcherQueue("DefaultDispatcherQueue", dispatcher);
    static Port<int> intPort = new Port<int>();

    static void Main(string[] args)
    {
        Arbiter.Activate(
            dispatcherQueue,
            Arbiter.FromIteratorHandler(new IteratorHandler(ProcessInts)));

        int counter = 0;
        Timer t = new Timer( (x) => 
            { for(int i = 0; i < 1500; ++i) intPort.Post(counter++);}
              , null, 0, 1000);

        Console.ReadKey();
    }

    public static IEnumerator<ITask> ProcessInts()
    {
        while (true)
        {
            yield return intPort.Receive();
            int currentValue;
            if( (currentValue = intPort) % 1000 == 0)
            {
                Console.WriteLine("{0}, Current Items In Queue:{1}", 
                  currentValue, intPort.ItemCount);
            }
        }
    }
}

令我大吃一惊的是,CCR 无法跟上 Corei7 机器的速度,队列大小无限制地增长。在另一项测试中,在约 100 Post/sec 的负载下测量从 Post() 到 Receive() 的延迟,每批中的第一个 Post() 和 Receive() 之间的延迟约为 1 毫秒。

我的模型有问题吗?如果是这样,使用 CCR 执行此操作的更好方法是什么?

I was experimenting with CCR iterators as a solution to a task that requires parallel processing of tons of data feeds, where the data from each feed needs to be processed in order. None of the feeds are dependent on each other, so the in-order processing can be paralleled per-feed.

Below is a quick and dirty mockup with one integer feed, which simply shoves integers into a Port at a rate of about 1.5K/second, and then pulls them out using a CCR iterator to keep the in-order processing guarantee.

class Program
{
    static Dispatcher dispatcher = new Dispatcher();
    static DispatcherQueue dispatcherQueue = 
       new DispatcherQueue("DefaultDispatcherQueue", dispatcher);
    static Port<int> intPort = new Port<int>();

    static void Main(string[] args)
    {
        Arbiter.Activate(
            dispatcherQueue,
            Arbiter.FromIteratorHandler(new IteratorHandler(ProcessInts)));

        int counter = 0;
        Timer t = new Timer( (x) => 
            { for(int i = 0; i < 1500; ++i) intPort.Post(counter++);}
              , null, 0, 1000);

        Console.ReadKey();
    }

    public static IEnumerator<ITask> ProcessInts()
    {
        while (true)
        {
            yield return intPort.Receive();
            int currentValue;
            if( (currentValue = intPort) % 1000 == 0)
            {
                Console.WriteLine("{0}, Current Items In Queue:{1}", 
                  currentValue, intPort.ItemCount);
            }
        }
    }
}

What surprised me about this greatly was that CCR could not keep up on a Corei7 box, with the queue size growing without bounds. In another test to measure the latency from the Post() to the Receive() under a load or ~100 Post/sec., the latency between the first Post() and Receive() in each batch was around 1ms.

Is there something wrong with my mockup? If so, what is a better way of doing this using CCR?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

梦行七里 2024-11-02 14:09:33

是的,我同意,这确实看起来很奇怪。您的代码最初似乎执行得很顺利,但是在几千个项目之后,处理器使用率上升到性能确实低迷的地步。这让我感到不安,并表明框架中存在问题。在玩过你的代码后,我无法真正确定为什么会出现这种情况。我建议将此问题带到 Microsoft 机器人论坛 并看看你能否让 George Chrysanthakopoulos(或其他 CCR 大脑之一)告诉你问题所在。不过,我可以推测您现在的代码效率非常低。

你处理从港口“弹出”的物品的方式是非常低效的。本质上,每当端口中有一条消息时,迭代器就会被唤醒,并且它只处理一条消息(尽管事实上端口中可能还有数百条消息),然后挂在yield上,而控制权被传递回框架。当产生的接收者导致迭代器的另一次“唤醒”时,许多消息已经填充了端口。从调度程序中拉出一个线程来仅处理单个项目(当许多项目同时堆积时)几乎肯定不是获得良好吞吐量的最佳方法。

我已经修改了您的代码,以便在屈服之后,我们检查端口以查看是否有任何其他消息排队并处理它们,从而在我们屈服返回框架之前完全清空端口。我还对您的代码进行了一些重构,以使用 CcrServiceBase ,这简化了您正在执行的某些任务的语法:

internal class Test:CcrServiceBase
{
    private readonly Port<int> intPort = new Port<int>();
    private Timer timer;
    public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
                                             new Dispatcher(0,
                                                            "dispatcher")))
    {

    }

    public void StartTest() {
        SpawnIterator(ProcessInts);
        var counter = 0;
        timer = new Timer(x =>
                          {
                              for (var i = 0; i < 1500; ++i)
                                  intPort.Post(counter++);
                          }
                          ,
                          null,
                          0,
                          1000);
    }

    public IEnumerator<ITask> ProcessInts()
    {
        while (true)
        {
            yield return intPort.Receive();
            int currentValue = intPort;
            ReportCurrent(currentValue);
            while(intPort.Test(out currentValue))
            {
                ReportCurrent(currentValue);
            }
        }
    }

    private void ReportCurrent(int currentValue)
    {
        if (currentValue % 1000 == 0)
        {
            Console.WriteLine("{0}, Current Items In Queue:{1}",
                              currentValue,
                              intPort.ItemCount);
        }
    }
}

或者,您可以完全取消迭代器,因为它在您的应用程序中并没有得到很好的使用。示例(尽管我不完全确定这对处理顺序有什么影响):

internal class Test : CcrServiceBase
{
    private readonly Port<int> intPort = new Port<int>();
    private Timer timer;

    public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
                                             new Dispatcher(0,
                                                            "dispatcher")))
    {

    }

    public void StartTest()
    {
        Activate(
            Arbiter.Receive(true,
                            intPort,
                            i =>
                            {
                                ReportCurrent(i);
                                int currentValue;
                                while (intPort.Test(out currentValue))
                                {
                                    ReportCurrent(currentValue);
                                }
                            }));
        var counter = 0;
        timer = new Timer(x =>
                          {
                              for (var i = 0; i < 500000; ++i)
                              {
                                  intPort.Post(counter++);
                              }
                          }
                          ,
                          null,
                          0,
                          1000);
    }



    private void ReportCurrent(int currentValue)
    {
        if (currentValue % 1000000 == 0)
        {
            Console.WriteLine("{0}, Current Items In Queue:{1}",
                              currentValue,
                              intPort.ItemCount);
        }
    }
}

这两个示例都将吞吐量显着提高了几个数量级。希望这有帮助。

Yes, I agree, this does indeed seem weird. Your code seems initially to perform smoothly, but after a few thousand items, processor usage rises to the point where performance is really lacklustre. This disturbs me and suggests a problem in the framework. After a play with your code, I can't really identify why this is the case. I'd suggest taking this problem to the Microsoft Robotics Forums and seeing if you can get George Chrysanthakopoulos (or one of the other CCR brains) to tell you what the problem is. I can however surmise that your code as it stands is terribly inefficient.

The way that you are dealing with "popping" items from the Port is very inefficient. Essentially the iterator is woken each time there is a message in the Port and it deals with only one message (despite the fact that there might be several hundred more in the Port), then hangs on the yield while control is passed back to the framework. At the point that the yielded receiver causes another "awakening" of the iterator, many many messages have filled the Port. Pulling a thread from the Dispatcher to deal with only a single item (when many have piled up in the meantime) is almost certainly not the best way to get good throughput.

I've modded your code such that after the yield, we check the Port to see if there are any further messages queued and deal with them too, thereby completely emptying the Port before we yield back to the framework. I've also refactored your code somewhat to use CcrServiceBase which simplifies the syntax of some of the tasks you are doing:

internal class Test:CcrServiceBase
{
    private readonly Port<int> intPort = new Port<int>();
    private Timer timer;
    public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
                                             new Dispatcher(0,
                                                            "dispatcher")))
    {

    }

    public void StartTest() {
        SpawnIterator(ProcessInts);
        var counter = 0;
        timer = new Timer(x =>
                          {
                              for (var i = 0; i < 1500; ++i)
                                  intPort.Post(counter++);
                          }
                          ,
                          null,
                          0,
                          1000);
    }

    public IEnumerator<ITask> ProcessInts()
    {
        while (true)
        {
            yield return intPort.Receive();
            int currentValue = intPort;
            ReportCurrent(currentValue);
            while(intPort.Test(out currentValue))
            {
                ReportCurrent(currentValue);
            }
        }
    }

    private void ReportCurrent(int currentValue)
    {
        if (currentValue % 1000 == 0)
        {
            Console.WriteLine("{0}, Current Items In Queue:{1}",
                              currentValue,
                              intPort.ItemCount);
        }
    }
}

Alternatively, you could do away with the iterator completely, as it's not really well used in your example (although I'm not entirely sure what effect this has on the order of processing):

internal class Test : CcrServiceBase
{
    private readonly Port<int> intPort = new Port<int>();
    private Timer timer;

    public Test() : base(new DispatcherQueue("DefaultDispatcherQueue",
                                             new Dispatcher(0,
                                                            "dispatcher")))
    {

    }

    public void StartTest()
    {
        Activate(
            Arbiter.Receive(true,
                            intPort,
                            i =>
                            {
                                ReportCurrent(i);
                                int currentValue;
                                while (intPort.Test(out currentValue))
                                {
                                    ReportCurrent(currentValue);
                                }
                            }));
        var counter = 0;
        timer = new Timer(x =>
                          {
                              for (var i = 0; i < 500000; ++i)
                              {
                                  intPort.Post(counter++);
                              }
                          }
                          ,
                          null,
                          0,
                          1000);
    }



    private void ReportCurrent(int currentValue)
    {
        if (currentValue % 1000000 == 0)
        {
            Console.WriteLine("{0}, Current Items In Queue:{1}",
                              currentValue,
                              intPort.ItemCount);
        }
    }
}

Both these examples significantly increase throughput by orders of magnitude. Hope this helps.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文