响应式框架消除了 Observable.Create 中的 while 循环
我正在处理来自网络的可观察字节流,我想将其提升到一层抽象。该格式有两个字节,其中包含下一条消息的长度。我想让它很好地适应反应式框架。到目前为止,我感觉不太对劲,所以我想知道我可能错过了哪些技巧来消除这里的 while 循环。
这是我想到的概念:
public static IObservable<Stream> GetToplevelStreams(IObservable<byte> byteStreamArg) {
return Observable.Create((IObserver<Stream>o)=>{
bool done = false;
var byteStream = byteStreamArg.Do(
b => { }, (ex) => { done = true; }, () => { done = true; });
while (!done)
{
var size = byteStream.Take(2).
Aggregate(0, (n, b) => (n << 8) + b).Single();
var buf = byteStream.Skip(2).Take(size);
var stream = new MemoryStream(buf.ToEnumerable().ToArray());
if (!done)
{
o.OnNext(stream);
}
}
return (() => {});
});
}
I am working with an observable byte stream, coming off of the network, and I would like to take that up one layer of abstraction. The format has two bytes that contain the length of the next message. I'd like to make this fit into the reactive framework pretty well. What I have so far feels not quite right, so I am wondering what tricks I may have missed to eliminate the while loop here.
Here's the concept I have in mind:
public static IObservable<Stream> GetToplevelStreams(IObservable<byte> byteStreamArg) {
return Observable.Create((IObserver<Stream>o)=>{
bool done = false;
var byteStream = byteStreamArg.Do(
b => { }, (ex) => { done = true; }, () => { done = true; });
while (!done)
{
var size = byteStream.Take(2).
Aggregate(0, (n, b) => (n << 8) + b).Single();
var buf = byteStream.Skip(2).Take(size);
var stream = new MemoryStream(buf.ToEnumerable().ToArray());
if (!done)
{
o.OnNext(stream);
}
}
return (() => {});
});
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
IObservable 这里有点奇怪 - 请记住,您正在返回一个“未来的流列表” - 我实际上只是返回一个
Stream
,或者可能是一个IObservable;
,其中每个数组代表一条消息。或者做得更好,并返回一个 IObservable另外,您的 While 循环使此非异步并且行为奇怪。更像这样的东西怎么样:
An IObservable is a bit weird here - remember that you're returning a "Future List of Streams" - I'd actually just return a
Stream
, or perhaps anIObservable<byte[]>
, where each array represents a message. Or do even better, and return anIObservable<ParsedMessage>
Also, your While loop makes this non-async and act strangely. How about something more like this: