具有响应式扩展的并行 HttpWebRequest
我有一个“Image”类,它具有三个属性:Url、Id、Content。 我有 10 张这样的图像的列表。 这是一个 silverlight 应用程序。
我想创建一个方法:
IObservable<Image> DownloadImages(List<Image> imagesToDownload)
{
//start downloading all images in imagesToDownload
//OnImageDownloaded:
image.Content = webResponse.Content
yield image
}
此方法开始并行下载所有 10 个图像。 然后,当每次下载完成时,它将 Image.Content 设置为该下载的 WebResponse.Content。
结果应该是包含每个下载图像的 IObservable 流。
我是 RX 的初学者,我认为我想要的可以通过 ForkJoin 实现,但那是在反应式扩展 dll 的实验版本中,我不想使用它。
另外,我真的不喜欢下载依靠回调来检测所有图像已下载,然后调用 onCompleted()。
对我来说,这似乎不符合 Rx 精神。
我还发布了到目前为止我编码的解决方案,尽管我不喜欢我的解决方案,因为它又长/丑陋并且使用计数器。
return Observable.Create((IObserver<Attachment> observer) =>
{
int downloadCount = attachmentsToBeDownloaded.Count;
foreach (var attachment in attachmentsToBeDownloaded)
{
Action<Attachment> action = attachmentDDD =>
this.BeginDownloadAttachment2(attachment).Subscribe(imageDownloadWebResponse =>
{
try
{
using (Stream stream = imageDownloadWebResponse.GetResponseStream())
{
attachment.FileContent = stream.ReadToEnd();
}
observer.OnNext(attachmentDDD);
lock (downloadCountLocker)
{
downloadCount--;
if (downloadCount == 0)
{
observer.OnCompleted();
}
}
} catch (Exception ex)
{
observer.OnError(ex);
}
});
action.Invoke(attachment);
}
return () => { }; //do nothing when subscriber disposes subscription
});
}
好吧,我确实根据吉姆的回答设法让它最终发挥作用。
var obs = from image in attachmentsToBeDownloaded.ToObservable()
from webResponse in this.BeginDownloadAttachment2(image).ObserveOn(Scheduler.ThreadPool)
from responseStream in Observable.Using(webResponse.GetResponseStream, Observable.Return)
let newImage = setAttachmentValue(image, responseStream.ReadToEnd())
select newImage;
其中 setAttachmentValue 只需执行 `image.Content = bytes;返回图像;
开始下载附件2代码:
private IObservable<WebResponse> BeginDownloadAttachment2(Attachment attachment)
{
Uri requestUri = new Uri(this.DownloadLinkBaseUrl + attachment.Id.ToString();
WebRequest imageDownloadWebRequest = HttpWebRequest.Create(requestUri);
IObservable<WebResponse> imageDownloadObservable = Observable.FromAsyncPattern<WebResponse>(imageDownloadWebRequest.BeginGetResponse, imageDownloadWebRequest.EndGetResponse)();
return imageDownloadObservable;
}
I have a class "Image" with three properties: Url, Id, Content.
I have a list of 10 such images.
This is a silverlight app.
I want to create a method:
IObservable<Image> DownloadImages(List<Image> imagesToDownload)
{
//start downloading all images in imagesToDownload
//OnImageDownloaded:
image.Content = webResponse.Content
yield image
}
This method starts downloading all 10 images in parallel.
Then, when each downloads completes, it sets the Image.Content to the WebResponse.Content of that download.
The result should be an IObservable stream with each downloaded image.
I'm a beginner in RX, and I think what I want can be achieved with ForkJoin, but that's in an experimental release of reactive extensions dll which I don't want to use.
Also I really don't like download counting on callbacks to detect that all images have been downloaded and then call onCompleted().
Doesn't seem to be in the Rx spirit to me.
Also I post what solution I've coded so far, though I don't like my solution because its long/ugly and uses counters.
return Observable.Create((IObserver<Attachment> observer) =>
{
int downloadCount = attachmentsToBeDownloaded.Count;
foreach (var attachment in attachmentsToBeDownloaded)
{
Action<Attachment> action = attachmentDDD =>
this.BeginDownloadAttachment2(attachment).Subscribe(imageDownloadWebResponse =>
{
try
{
using (Stream stream = imageDownloadWebResponse.GetResponseStream())
{
attachment.FileContent = stream.ReadToEnd();
}
observer.OnNext(attachmentDDD);
lock (downloadCountLocker)
{
downloadCount--;
if (downloadCount == 0)
{
observer.OnCompleted();
}
}
} catch (Exception ex)
{
observer.OnError(ex);
}
});
action.Invoke(attachment);
}
return () => { }; //do nothing when subscriber disposes subscription
});
}
Ok, I did manage it to make it work in the end based on Jim's answer.
var obs = from image in attachmentsToBeDownloaded.ToObservable()
from webResponse in this.BeginDownloadAttachment2(image).ObserveOn(Scheduler.ThreadPool)
from responseStream in Observable.Using(webResponse.GetResponseStream, Observable.Return)
let newImage = setAttachmentValue(image, responseStream.ReadToEnd())
select newImage;
where setAttachmentValue just takes does `image.Content = bytes; return image;
BeginDownloadAttachment2 code:
private IObservable<WebResponse> BeginDownloadAttachment2(Attachment attachment)
{
Uri requestUri = new Uri(this.DownloadLinkBaseUrl + attachment.Id.ToString();
WebRequest imageDownloadWebRequest = HttpWebRequest.Create(requestUri);
IObservable<WebResponse> imageDownloadObservable = Observable.FromAsyncPattern<WebResponse>(imageDownloadWebRequest.BeginGetResponse, imageDownloadWebRequest.EndGetResponse)();
return imageDownloadObservable;
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我们稍微简化一下怎么样?获取您的图像列表并将其转换为可观察的。接下来,考虑使用 Observable.FromAsyncPattern 来管理服务请求。最后使用 SelectMany 来协调请求与响应。我正在对如何在此处获取文件流做出一些假设。本质上,如果您可以将 BeginInvoke/EndInvoke 委托传递到 FromAsyncPattern 中来满足您的服务请求,那么您就很好了。
How about we simplify this a bit. Take your image list and convert it to an observable. Next, consider using the Observable.FromAsyncPattern to manage the service requests. Finally use SelectMany to coordinate the request with the response. I'm making some assumptions on how you are getting the file streams here. Essentially if you can pass in the BeginInvoke/EndInvoke delegates into FromAsyncPattern for your service request you are good.