如何使用 Rx 通过异步 WCF 服务轮询图像
我有一个异步 WCF 服务,它接受“URI”并返回图像(作为流)。
我想要做的是:
- 确保存在有效的 WCF 通道,如果没有创建它
- 则进行异步服务调用
- 成功时将图像保存到成员变量
- 如果出现异常,则关闭通道
- 无论失败还是成功,请等待 200ms 然后重新开始(永远循环或直到取消)
到目前为止,我已经想出了这个怪物:
private void PollImage(string imageUri)
{
const int pollingHertz = 1;
const int millisecondsTimeout = 1000 / pollingHertz;
Thread.Sleep(millisecondsTimeout);
if (_channel == null)
{
_channel = _channelFactory.CreateChannel();
}
var getImageFunc = Observable.FromAsyncPattern<string, Stream>
(_channel.BeginGetImage, _channel.EndGetImage);
getImageFunc(imageUri)
.Finally(() => PollImage(imageUri))
.Subscribe(
stream => UpdateImageStream(imageUri, stream),
ex =>
{
Trace.TraceError(ex.ToString());
((ICommunicationObject) _channel).CloseOrAbort();
_channel = null;
});
}
我真的很想学习 Rx,但每次我尝试时我都会摸不着头脑。
有人愿意给我一些建议吗?谢谢
I have an async WCF service that takes a "URI" and returns an image (as a Stream).
Want I want to do is:
- Ensure a valid WCF channel exists, if no create it
- Make the async service call
- On success save the image to a member variable
- If I get an exception, close the channel
- Whether it fails or succeeds, wait 200ms then start again (looping forever or until cancelled)
So far I have come up with this monstrosity:
private void PollImage(string imageUri)
{
const int pollingHertz = 1;
const int millisecondsTimeout = 1000 / pollingHertz;
Thread.Sleep(millisecondsTimeout);
if (_channel == null)
{
_channel = _channelFactory.CreateChannel();
}
var getImageFunc = Observable.FromAsyncPattern<string, Stream>
(_channel.BeginGetImage, _channel.EndGetImage);
getImageFunc(imageUri)
.Finally(() => PollImage(imageUri))
.Subscribe(
stream => UpdateImageStream(imageUri, stream),
ex =>
{
Trace.TraceError(ex.ToString());
((ICommunicationObject) _channel).CloseOrAbort();
_channel = null;
});
}
I really want to learn Rx but each time I try I get left scratching my head.
Anyone care to give me some pointers on this? Thanks
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
我为您提供了一个解决方案,但我建议更改您的
PollImage
方法,使其更像 Rx。签名应该如下所示:
您应该将
PollImage
视为一个可观察工厂,并且在您订阅返回的可观察之前,它实际上不会轮询图像。这种方法的优点是它使取消订阅成为可能 - 您的最后一个要点需要这样做 - 并且它干净地分离了轮询图像的代码和更新局部变量的代码。因此,对 PollImage 的调用如下所示:
实现如下所示:
query
observable 等待直到gapInterval
完成,然后调用WCF函数返回流,然后将流转换为图像。内部的 return 语句做了很多事情。
首先,它使用
Do
运算符捕获发生的任何异常,并像以前一样重置跟踪和通道。接下来,它调用
.Repeat()
来有效地重新运行查询,使其在再次调用 Web 服务之前等待gapInterval
。我可以在query
中使用Observable.Interval
而不是Observable.Timer
并删除对.Repeat()
的调用>,但这意味着对 Web 服务的调用每个gapInterval
都会启动,而不是在上次完成后等待那么长时间。接下来,它调用
.Retry()
,如果遇到异常,它会有效地重新启动可观察对象,以便订阅者永远不会看到异常。Do
运算符捕获错误,因此这是可以的。最后,它订阅观察者并返回 IDisposable ,允许调用代码取消订阅。
除了实现 getImageFromStream 函数之外,仅此而已。
现在请注意。许多人误解了订阅可观察量的工作原理,这可能导致难以发现错误。
以此为例:
s1
&s2
订阅xs
,但它们不是共享一个计时器,而是各自创建一个计时器。您创建了两个 Observable.Interval 内部运作实例,而不是一个。现在这是可观察量的正确行为。如果其中一个失败,另一个也不会,因为它们不共享任何内部结构——它们彼此隔离。
但是,在您的代码(以及我的代码)中,您存在潜在的线程问题,因为您在对
PollImage
的多次调用中共享_channel
。如果一个调用失败,它会重置通道,这可能会导致并发调用失败。我的建议是为每个调用创建一个新通道以防止并发问题。
我希望这有帮助。
I have a solution for you, but I'm going to suggest a change to your
PollImage
method to make it more Rx-like.The signature should look like this:
You should consider
PollImage
to be an observable factory, and it won't actually poll for images until you subscribe to the returned observable. The advantage with this approach is that it makes unsubscribing possible - your last bullet point requires this - and it cleanly separates the code that polls for images and the code that updates the local variables.So, the call to
PollImage
then looks like this:And the implementation looks like this:
The
query
observable waits until thegapInterval
is complete and then calls the WCF function to return the stream and then converts the stream to an image.The inner
return
statement does a number of things.First it uses a
Do
operator to capture any exceptions that occur and does your tracing and channel reset as before.Next it calls
.Repeat()
to cause query to be re-run effectively making it waitgapInterval
before calling the webservice again. I could have usedObservable.Interval
rather thanObservable.Timer
inquery
and drop the call to.Repeat()
, but this would have meant the calls to the webservice start everygapInterval
rather than wait that long after it completed last time.Next it calls
.Retry()
which effectively restarts the observable if it encounters an exception so that the subscriber never sees the exception. TheDo
operator captures the errors so this is OK.Finally it subscribes the observer and returns the
IDisposable
allowing the calling code to unsubscribe.Other than implementing the
getImageFromStream
function, that's about it.Now a word of caution. A lot of people misunderstand how subscribing to observables works and this can lead to hard to discover bugs.
Take this as an example:
Both
s1
&s2
subscribe toxs
, but rather than share a single timer they each create a timer. You have two instances of the internal workings ofObservable.Interval
created, not one.Now this is the correct behaviour for observables. In the event that one fails then the other won't because they don't share any internals - they are isolated from each other.
However, in your code (and mine for that matter) you have a potential threading issue because you share
_channel
across multiple calls toPollImage
. If one call fails it resets the channel and this can cause concurrent calls to then fail as a result.My suggestion is that you create a new channel for each call to prevent concurrency issues.
I hope this helps.
这就是我想到的(在一些帮助下!)...仍然不是“完美”,但似乎有效。
正如 @Enigma 所说,我现在已经摆脱了共享 _channel 并将其替换为捕获的本地变量。它有效,但我不明白 Rx enuf 来知道这是否是糟糕/有缺陷的方法。我怀疑至少有一种更清洁的方法。
除此之外,我的主要反对意见是 Do(),我称之为 EnsureChannel... 似乎有点臭。但是...它有效...
哦,我必须在 SelectMany 中包含 _(下划线),否则不会再次调用 GetImage。
This is what I came up with (with some help!)... still not "perfect" but seems to work.
As @Enigma said I've got rid of the shared _channel now and replaced it with a captured local var. It works but I don't understand Rx enuf to know if this is poor/buggy approach. I suspect there is a cleaner way at least.
Other than that, my main objection is the Do() where I call EnsureChannel... seems a bit smelly. But... it works...
Oh and I must have the _ (underscore) in the SelectMany or else the GetImage is not called again.
如果你真的想使用,那么人们已经回答了你的问题,如果你正在寻找替代方法,那么我建议你看看 TPL(任务等对象),它允许你从异步方法模式(你的网络)创建任务对象服务调用),然后使用取消令牌启动任务,以便在一段时间后如果任务未完成,您可以通过调用令牌取消方法来取消它。
If you really want to use then people have already answered your question and if you are looking for alternate ways then I would suggest have a look at TPL (Task etc objects) that allows you to create task object from a Async method pattern (your web service call) and then start the task with a cancellation token so that after some time if the task is not completed you can cancel it by calling token cancel method.