如何通过ID RX分组和油门对象

发布于 2025-01-26 15:16:33 字数 4352 浏览 1 评论 0原文

我有同一类型的传入对象,但是如果对象属性Isthrottlable设置为false,则无论我不想限制ID,但如果Isthrottlable,则设置确实,我想每3秒通过ID防止对象。因此,如果具有相同ID的对象在50次以3秒为单位出现,我想发送最后一个对象的httpsend。

namespace BoatThrottle
{
    class MData
    {
        public int ID { get; set; }
        public bool IsThrottlable { get; set; }
        public string Description { get; set; }
    }
    class Program
    {
        static void Main(string[] args)
        {
            Random rand = new Random();

            while (true)
            {
                var data = GenerateRandomObj(rand);
                SendData(data);
                Task.Delay(rand.Next(100, 2000));
            }
        }

        static MData GenerateRandomObj(Random rand)
        {
            return new MData() { ID = rand.Next(1, 20), Description = "Notification....", IsThrottlable = (rand.Next(2) == 1) };
        }

        static void SendData(MData mData)
        {
            if (mData.IsThrottlable)
            {
                _doValues.OnNext(mData);
                var dd = ThrottledById(DoValues);

                var observable =
                   dd
                    .Throttle(TimeSpan.FromMilliseconds(3000.0))
                    .ObserveOn(Scheduler.ThreadPool.DisableOptimizations());

                _subscription =
                    observable
                    .ObserveOn(Scheduler.ThreadPool.DisableOptimizations())
                        .Subscribe(y =>
                        {
                            HTTPSend(y);
                        });

            }
            else
            {
                // MData object coming in IsThrottlable set to false always send this data NO throttling
                HTTPSend(mData);
            }

        }
        private static IDisposable? _subscription = null;

        public static IObservable<MData> ThrottledById(IObservable<MData> observable)
        {
            return observable.Buffer(TimeSpan.FromSeconds(3))
                .SelectMany(x =>
                    x.GroupBy(y => y.ID)
                    .Select(y => y.Last()));
        }

        private static readonly Subject<MData> _doValues = new Subject<MData>();

        public static IObservable<MData> DoValues { get { return _doValues; } }

        static void HTTPSend(MData mData)
        {
            Console.WriteLine("===============HTTP===>>  " + mData.ID + "  " + mData.Description + " " + mData.IsThrottlable);
        }
    }
}

编辑:

例如全部在3秒内收到

  • mdata ID = 1,isthrottlable = false,description =“ notify”

  • mdata id = 2,isthrottlable = true,description =“ notify1”

  • mdata id = 2,isthrottlable = true,description =“ notify2”

  • mdata id = 9,isthrottlable = false,description =“ notify2”

  • mdata id = 2,isthrottlable = true,description =“ notify3”

  • mdata id = 2,isthrottlable = true,description =“ notify4”

  • mdata id = 3,isthrottlable = true,description =“ notify”

  • mdata id = 4,isthrottlable = true,description =“ notify”

  • mdata id = 5,isthrottlable = true,description =“ notify1”

  • mdata id = 5,isthrottlable = true,description =“ notify2”

  • mdata id = 8,isthrottlable = true,description =“ notify1”

  • mdata id = 8,isthrottlable = true,description =“ notify2”

  • mdata id = 8,isthrottlable = true,description =“ notify3”

  • mdata id = 8,isthrottlable = true,description =“ notify4”

  • mdata id = 8,isthrottlable = true,description =“ notify5”

  • mdata id = 8,isthrottlable = true,description =“ notify6”

前3秒

  • mdata id = 1,isthrottlable = false = false,description =“ notify”
  • mdata id = 9,isthrottlable = false ,descrivation =“ notify2”
  • mdata id = 2,isthrottlable = true,description
  • =“ notify4” mdata id = 3 ,isthrottlable = true
  • = true,description =“ notify”
  • mdata id = 4,isthrottlable = true = true,descria = 5,isthrottlable = true,description =“ notify2”
  • mdata id = 8,isthrottlable = true,description =“ notify6”

I have incoming objects of the same type, but if An Object property IsThrottlable is set to false regardless of the ID I DON'T want to throttle it but if IsThrottlable is set to true I would like to throttle the object every 3 seconds by ID. So if an object with the same ID comes in 50 times with 3 seconds I would like to send the HTTPSend for the last Object.

namespace BoatThrottle
{
    class MData
    {
        public int ID { get; set; }
        public bool IsThrottlable { get; set; }
        public string Description { get; set; }
    }
    class Program
    {
        static void Main(string[] args)
        {
            Random rand = new Random();

            while (true)
            {
                var data = GenerateRandomObj(rand);
                SendData(data);
                Task.Delay(rand.Next(100, 2000));
            }
        }

        static MData GenerateRandomObj(Random rand)
        {
            return new MData() { ID = rand.Next(1, 20), Description = "Notification....", IsThrottlable = (rand.Next(2) == 1) };
        }

        static void SendData(MData mData)
        {
            if (mData.IsThrottlable)
            {
                _doValues.OnNext(mData);
                var dd = ThrottledById(DoValues);

                var observable =
                   dd
                    .Throttle(TimeSpan.FromMilliseconds(3000.0))
                    .ObserveOn(Scheduler.ThreadPool.DisableOptimizations());

                _subscription =
                    observable
                    .ObserveOn(Scheduler.ThreadPool.DisableOptimizations())
                        .Subscribe(y =>
                        {
                            HTTPSend(y);
                        });

            }
            else
            {
                // MData object coming in IsThrottlable set to false always send this data NO throttling
                HTTPSend(mData);
            }

        }
        private static IDisposable? _subscription = null;

        public static IObservable<MData> ThrottledById(IObservable<MData> observable)
        {
            return observable.Buffer(TimeSpan.FromSeconds(3))
                .SelectMany(x =>
                    x.GroupBy(y => y.ID)
                    .Select(y => y.Last()));
        }

        private static readonly Subject<MData> _doValues = new Subject<MData>();

        public static IObservable<MData> DoValues { get { return _doValues; } }

        static void HTTPSend(MData mData)
        {
            Console.WriteLine("===============HTTP===>>  " + mData.ID + "  " + mData.Description + " " + mData.IsThrottlable);
        }
    }
}

EDIT:

e.g ALL received within 3 seconds

  • MData ID = 1, IsThrottlable = False, Description = "Notify"

  • MData ID = 2, IsThrottlable = True, Description = "Notify1"

  • MData ID = 2, IsThrottlable = True, Description = "Notify2"

  • MData ID = 9, IsThrottlable = False, Description = "Notify2"

  • MData ID = 2, IsThrottlable = True, Description = "Notify3"

  • MData ID = 2, IsThrottlable = True, Description = "Notify4"

  • MData ID = 3, IsThrottlable = True, Description = "Notify"

  • MData ID = 4, IsThrottlable = True, Description = "Notify"

  • MData ID = 5, IsThrottlable = True, Description = "Notify1"

  • MData ID = 5, IsThrottlable = True, Description = "Notify2"

  • MData ID = 8, IsThrottlable = True, Description = "Notify1"

  • MData ID = 8, IsThrottlable = True, Description = "Notify2"

  • MData ID = 8, IsThrottlable = True, Description = "Notify3"

  • MData ID = 8, IsThrottlable = True, Description = "Notify4"

  • MData ID = 8, IsThrottlable = True, Description = "Notify5"

  • MData ID = 8, IsThrottlable = True, Description = "Notify6"

Expected at the First 3 seconds:

  • MData ID = 1, IsThrottlable = False, Description = "Notify"
  • MData ID = 9, IsThrottlable = False, Description = "Notify2"
  • MData ID = 2, IsThrottlable = True, Description = "Notify4"
  • MData ID = 3, IsThrottlable = True, Description = "Notify"
  • MData ID = 4, IsThrottlable = True, Description = "Notify"
  • MData ID = 5, IsThrottlable = True, Description = "Notify2"
  • MData ID = 8, IsThrottlable = True, Description = "Notify6"

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

雪落纷纷 2025-02-02 15:16:33

我决定按照您的问题中发布的最终实现,但这应该是一个答案,并以最愚蠢的RX方式清理查询。

这是我的代码版本:

public MainWindow()
{
    InitializeComponent();

    Debug.Print("========================");

    _subscription =
        Observable
            .Generate(0, x => true, x => x + 1,
                x => new MData() { ID = Random.Shared.Next(1, 3), Description = "Notification....", IsThrottlable = Random.Shared.Next(2) == 1 },
                x => TimeSpan.FromMilliseconds(Random.Shared.Next(100, 2000)))
            .GroupBy(m => m.IsThrottlable)
            .SelectMany(g =>
                g.Key
                ? g.GroupBy(x => x.ID).SelectMany(g2 => g2.Throttle(TimeSpan.FromSeconds(3.0)))
                : g)
            .SelectMany(m => Observable.Start(() => HTTPSend(m)))
            .Subscribe();
}

最终.selectmany(m =&gt; observable.start(()=&gt; httpsend(m)))可能需要将其写入.select( m =&gt; observable.start(()=&gt; httpsend(m))。合并(1)

I decided to take your final implementation, as posted in your question, but it should be as an answer, and clean up the query for you in a way that is the most idiomatic Rx kind of way.

Here's my version of your code:

public MainWindow()
{
    InitializeComponent();

    Debug.Print("========================");

    _subscription =
        Observable
            .Generate(0, x => true, x => x + 1,
                x => new MData() { ID = Random.Shared.Next(1, 3), Description = "Notification....", IsThrottlable = Random.Shared.Next(2) == 1 },
                x => TimeSpan.FromMilliseconds(Random.Shared.Next(100, 2000)))
            .GroupBy(m => m.IsThrottlable)
            .SelectMany(g =>
                g.Key
                ? g.GroupBy(x => x.ID).SelectMany(g2 => g2.Throttle(TimeSpan.FromSeconds(3.0)))
                : g)
            .SelectMany(m => Observable.Start(() => HTTPSend(m)))
            .Subscribe();
}

The final .SelectMany(m => Observable.Start(() => HTTPSend(m))) might need to be written as .Select(m => Observable.Start(() => HTTPSend(m))).Merge(1).

甜点 2025-02-02 15:16:33

一种方法是通过isthrottlable属性对序列进行分组。这样,您将获得一个嵌套序列,其中包含两个子序列,一个序列包含可节流元素,另一个包含不可碰撞的元素。然后,您可以相应地转换两个子序中的每个子序列,最后使用selectmany操作员将嵌套序列缩回到一个扁平序列中,该序列包含两个转换的子序列发出的元素。

包含不可碰撞元素的子序列不需要转换,因此您可以按原样返回。

包含可节流元素的子序列需要通过id属性将其进一步分组,从而产生仅包含具有相同ID的可节流元素的更薄的子序列。这些是需要进行限制的序列:

IObservable<MData> throttled = source
    .GroupBy(x => x.IsThrottlable)
    .SelectMany(g1 =>
    {
        if (!g1.Key) return g1; // Not throttleable, return it as is.
        return g1
            .GroupBy(x => x.ID)
            .SelectMany(g2 => g2.Throttle(TimeSpan.FromSeconds(3)));
    });

最后,您将获得一个平坦的序列,其中包含可节流和不可侵入的项目,并且已经由ID插入了可节流项目。

selectmany运算符本质上是选择+MERGE运算符的组合。

One way to do it is to group the sequence by the IsThrottlable property. This way you'll get a nested sequence that contains two subsequences, one containing the throttleable elements and one containing the non-throttleable elements. You can then transform each of the two subsequences accordingly, and finally use the SelectMany operator to flatten the nested sequence back to a flat sequence that contains the elements emitted by the two transformed subsequences.

The subsequence that contains the non-throttleable elements needs no transformation, so you can return it as is.

The subsequence that contains the throttleable elements needs to be grouped further by the ID property, producing even thinner subsequences that contain only throttleable elements having the same id. These are the sequences that need to be throttled:

IObservable<MData> throttled = source
    .GroupBy(x => x.IsThrottlable)
    .SelectMany(g1 =>
    {
        if (!g1.Key) return g1; // Not throttleable, return it as is.
        return g1
            .GroupBy(x => x.ID)
            .SelectMany(g2 => g2.Throttle(TimeSpan.FromSeconds(3)));
    });

At the end you'll get a flat sequence that contains both the throttleable and the non-throttleable items, with the throttleable items already throttled by id.

The SelectMany operator is essentially a combination of the Select+Merge operators.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文