我想使用反应式扩展来转换一些消息并在小延迟后中继它们。
消息看起来像这样:
class InMsg
{
int GroupId { get; set; }
int Delay { get; set; }
string Content { get; set; }
}
输出看起来像这样:
class OutMsg
{
int GroupId { get; set; }
string Content { get; set; }
OutMsg(InMsg in)
{
GroupId = in.GroupId;
Content = Transform(in.Content); // function omitted
}
}
有几个要求:
- 延迟的长度取决于消息的内容。
- 每个消息都有一个 GroupId
- 如果新消息与等待传输的延迟消息具有相同的 GroupId,则应丢弃第一个消息,并在新的延迟周期后仅传输第二个消息。
给定一个 Observable和 Send 函数:
IObservable<InMsg> inMsgs = ...;
void Send(OutMsg o)
{
... // publishes transformed messages
}
我知道我可以使用 Select 来执行转换。
void SetUp()
{
inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
- 如何应用指定延迟的消息? (请注意,这可能/应该会导致消息传送无序。)
- 如何对具有相同 GroupId 的消息进行重复数据删除?
- Rx能解决这个问题吗?
- 还有其他方法可以解决这个问题吗?
I want to use Reactive Extensions to transform some messages and relay them after a small delay.
The messages look something like this:
class InMsg
{
int GroupId { get; set; }
int Delay { get; set; }
string Content { get; set; }
}
The output looks something like this:
class OutMsg
{
int GroupId { get; set; }
string Content { get; set; }
OutMsg(InMsg in)
{
GroupId = in.GroupId;
Content = Transform(in.Content); // function omitted
}
}
There are a couple of requirements:
- The length of the delay is dependent on the content of the message.
- Each message has a GroupId
- If a newer message comes in with the same GroupId as a delayed message awaiting transmission then the first message should be dropped and only the second one transmitted after a new delay period.
Given an Observable<InMsg> and a Send function:
IObservable<InMsg> inMsgs = ...;
void Send(OutMsg o)
{
... // publishes transformed messages
}
I understand that I can use Select to perform the transformation.
void SetUp()
{
inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
- How can I apply a message specify delay? (Note this might/should result in out of order delivery of messages.)
- How can I de-dupe messages with the same GroupId?
- Is Rx capable of solving this problem?
- Is there another way of solving this?
发布评论
评论(2)
您可以使用
GroupBy
来创建IGroupedObservable
,使用Delay
来延迟输出,使用Switch
来确保更新的值替换组中以前的值:关于实现的注意事项:如果分组值在发送消息之后(即延迟之后)到达,它将启动新的延迟
You can use
GroupBy
to make anIGroupedObservable
,Delay
to delay the output, andSwitch
to make sure newer values replace previous values in their group:A note on the implementation: if a grouped value arrived after the message is sent (ie. after the delay), it will start a new delay
@Richard Szalay 的答案几乎对我有用(在.NET Framework 4.6上使用.NET Rx 3.1.1),但我必须在末尾添加> 结果的表达式,如下所示:
.Merge()
用于组合 IObservable对于我(在 .NET Framework 4.6 上使用 .NET Rx 3.1.1),修复方法是添加
。 Merge()
到最后,如下所示:@Richard Szalay's answer almost works for me (using .NET Rx 3.1.1 on .NET Framework 4.6), but I have to add
.Merge()
to the end of the expression to combine theIObservable<IObservable<OutMsg>>
results, like so:For me (using .NET Rx 3.1.1 on .NET Framework 4.6) the fix was to add
.Merge()
to the end, like so: