使用反应性扩展 (Rx) 进行延迟和重复数据删除

发布于 2024-10-12 19:26:36 字数 1027 浏览 5 评论 0 原文

我想使用反应式扩展来转换一些消息并在小延迟后中继它们。

消息看起来像这样:

class InMsg
{
   int GroupId { get; set; }
   int Delay { get; set; }
   string Content { get; set; }
}

输出看起来像这样:

class OutMsg
{ 
   int GroupId { get; set; }
   string Content { get; set; }
   OutMsg(InMsg in)
   {
       GroupId = in.GroupId;
       Content = Transform(in.Content);  // function omitted
   }
}

有几个要求:

  • 延迟的长度取决于消息的内容。
  • 每个消息都有一个 GroupId
  • 如果新消息与等待传输的延迟消息具有相同的 GroupId,则应丢弃第一个消息,并在新的延迟周期后仅传输第二个消息。

给定一个 Observable和 Send 函数:

IObservable<InMsg> inMsgs = ...;

void Send(OutMsg o)
{
     ... // publishes transformed messages
}

我知道我可以使用 Select 来执行转换。

void SetUp()
{
     inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
  • 如何应用指定延迟的消息? (请注意,这可能/应该会导致消息传送无序。)
  • 如何对具有相同 GroupId 的消息进行重复数据删除?
  • Rx能解决这个问题吗?
  • 还有其他方法可以解决这个问题吗?

I want to use Reactive Extensions to transform some messages and relay them after a small delay.

The messages look something like this:

class InMsg
{
   int GroupId { get; set; }
   int Delay { get; set; }
   string Content { get; set; }
}

The output looks something like this:

class OutMsg
{ 
   int GroupId { get; set; }
   string Content { get; set; }
   OutMsg(InMsg in)
   {
       GroupId = in.GroupId;
       Content = Transform(in.Content);  // function omitted
   }
}

There are a couple of requirements:

  • The length of the delay is dependent on the content of the message.
  • Each message has a GroupId
  • If a newer message comes in with the same GroupId as a delayed message awaiting transmission then the first message should be dropped and only the second one transmitted after a new delay period.

Given an Observable<InMsg> and a Send function:

IObservable<InMsg> inMsgs = ...;

void Send(OutMsg o)
{
     ... // publishes transformed messages
}

I understand that I can use Select to perform the transformation.

void SetUp()
{
     inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
  • How can I apply a message specify delay? (Note this might/should result in out of order delivery of messages.)
  • How can I de-dupe messages with the same GroupId?
  • Is Rx capable of solving this problem?
  • Is there another way of solving this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

dawn曙光 2024-10-19 19:26:36

您可以使用GroupBy来创建IGroupedObservable,使用Delay来延迟输出,使用Switch来确保更新的值替换组中以前的值:

IObservable<InMsg> inMessages;

inMessages
    .GroupBy(msg => msg.GroupId)
    .Select(group =>
        {
            return group.Select(groupMsg => 
                {
                    TimeSpan delay = TimeSpan.FromMilliseconds(groupMsg.Delay);
                    OutMsg outMsg = new OutMsg(); // map InMsg -> OutMsg here

                    return Observable.Return(outMsg).Delay(delay);
                })
                .Switch();
        })
        .Subscribe(outMsg => Console.Write("OutMsg received"));

关于实现的注意事项:如果分组值在发送消息之后(即延迟之后)到达,它将启动新的延迟

You can use GroupBy to make an IGroupedObservable, Delay to delay the output, and Switch to make sure newer values replace previous values in their group:

IObservable<InMsg> inMessages;

inMessages
    .GroupBy(msg => msg.GroupId)
    .Select(group =>
        {
            return group.Select(groupMsg => 
                {
                    TimeSpan delay = TimeSpan.FromMilliseconds(groupMsg.Delay);
                    OutMsg outMsg = new OutMsg(); // map InMsg -> OutMsg here

                    return Observable.Return(outMsg).Delay(delay);
                })
                .Switch();
        })
        .Subscribe(outMsg => Console.Write("OutMsg received"));

A note on the implementation: if a grouped value arrived after the message is sent (ie. after the delay), it will start a new delay

江湖正好 2024-10-19 19:26:36

@Richard Szalay 的答案几乎对我有用(在.NET Framework 4.6上使用.NET Rx 3.1.1),但我必须在末尾添加.Merge()用于组合 IObservable> 结果的表达式,如下所示:

对于我(在 .NET Framework 4.6 上使用 .NET Rx 3.1.1),修复方法是添加 。 Merge() 到最后,如下所示:

var deduplicated = inputs
    .GroupBy(input => input)
    .Select(group =>
        group
        .Select(input => Observable.Return(input).Delay(TimeSpan.FromSeconds(5)))
        .Switch())
    .Merge(); // <-- This is added to combine the partitioned results

@Richard Szalay's answer almost works for me (using .NET Rx 3.1.1 on .NET Framework 4.6), but I have to add .Merge() to the end of the expression to combine the IObservable<IObservable<OutMsg>> results, like so:

For me (using .NET Rx 3.1.1 on .NET Framework 4.6) the fix was to add .Merge() to the end, like so:

var deduplicated = inputs
    .GroupBy(input => input)
    .Select(group =>
        group
        .Select(input => Observable.Return(input).Delay(TimeSpan.FromSeconds(5)))
        .Switch())
    .Merge(); // <-- This is added to combine the partitioned results
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文