HttpWebRequest 异步调用填充堆栈并剥离 Windows Phone 上的反应式扩展
我正在编写一些代码,将文件从网络资源下载到磁盘。从网络读取和写入都是异步完成的。我观察到一个问题,异步调用实际上是同步调用的,因此每次新的迭代都会在堆栈上创建一个新的函数调用。这是代码:
private IsolatedStorageFileStream fileStream = null;
private HttpWebRequest webRequest = null;
private Stream responseStream = null;
private long responsePosition = 0;
private static int BufferSize = 4096;
private byte[] bufferRead = new byte[BufferSize];
private void button4_Click(object sender, RoutedEventArgs e)
{
string fileName = "TestFile.mp3";
using( var store = IsolatedStorageFile.GetUserStoreForApplication() )
{
if( store.FileExists(fileName) )
{
store.DeleteFile(fileName);
}
fileStream = store.OpenFile(fileName, FileMode.CreateNew, FileAccess.Write, FileShare.Read);
webRequest = WebRequest.Create(new Uri(mpsUri.Text)) as HttpWebRequest;
var observableRequest = Observable.FromAsyncPattern<WebResponse>(webRequest.BeginGetResponse, webRequest.EndGetResponse);
Observable.Timeout(observableRequest.Invoke(), TimeSpan.FromMinutes(2))
.Subscribe(response => { ResponseCallback(response); }, exception => { TimeoutCallback(); });
}
}
private void TimeoutCallback()
{
webRequest.Abort();
MessageBox.Show("Request timed-out");
}
private void ResponseCallback(WebResponse webResponse)
{
if( (webResponse as HttpWebResponse).StatusCode != HttpStatusCode.OK )
{
MessageBox.Show("Download error1");
}
else
{
responseStream = webResponse.GetResponseStream();
if( responsePosition != 0 )
{
responseStream.Position = responsePosition;
}
IAsyncResult readResult = responseStream.BeginRead(bufferRead, 0, BufferSize, new AsyncCallback(ReadCallback), null);
return;
}
webResponse.Close();
MessageBox.Show("Download error2");
}
private void ReadCallback(IAsyncResult asyncResult)
{
int bytes = responseStream.EndRead(asyncResult);
DLog.Info("store:{0}, current size:{1}", bytes, fileStream.Length);
if( bytes > 0 )
{
fileStream.BeginWrite(bufferRead, 0, bytes, WriteCallback, null);
return;
}
responseStream.Close();
MessageBox.Show("Download error3");
}
private void WriteCallback(IAsyncResult asyncResult)
{
DLog.Info("Stored!");
responseStream.BeginRead(bufferRead, 0, BufferSize, new AsyncCallback(ReadCallback), null);
}
这是(稍作编辑的)堆栈的片段:
(...)
MainPage.ReadCallback(System.IAsyncResult asyncResult) Line 190 + 0x1b bytes C#
Stream.BeginRead(byte[] buffer, int offset, int count, System.AsyncCallback callback, object state) + 0x6c bytes
MainPage.WriteCallback(System.IAsyncResult asyncResult) Line 200 + 0x1f bytes C#
Stream.BeginWrite(byte[] buffer, int offset, int count, System.AsyncCallback callback, object state) + 0x64 bytes
FileStream.BeginWrite(byte[] array, int offset, int numBytes, System.AsyncCallback userCallback, object stateObject) + 0x7c bytes
IsolatedStorageFileStream.BeginWrite(byte[] buffer, int offset, int numBytes, System.AsyncCallback userCallback, object stateObject) + 0x1a bytes
MainPage.ReadCallback(System.IAsyncResult asyncResult) Line 190 + 0x1b bytes C#
Stream.BeginRead(byte[] buffer, int offset, int count, System.AsyncCallback callback, object state) + 0x6c bytes
MainPage.WriteCallback(System.IAsyncResult asyncResult) Line 200 + 0x1f bytes C#
Stream.BeginWrite(byte[] buffer, int offset, int count, System.AsyncCallback callback, object state) + 0x64 bytes
FileStream.BeginWrite(byte[] array, int offset, int numBytes, System.AsyncCallback userCallback, object stateObject) + 0x7c bytes
IsolatedStorageFileStream.BeginWrite(byte[] buffer, int offset, int numBytes, System.AsyncCallback userCallback, object stateObject) + 0x1a bytes
MainPage.ReadCallback(System.IAsyncResult asyncResult) Line 190 + 0x1b bytes C#
Stream.BeginRead(byte[] buffer, int offset, int count, System.AsyncCallback callback, object state) + 0x6c bytes
MainPage.WriteCallback(System.IAsyncResult asyncResult) Line 200 + 0x1f bytes C#
Stream.BeginWrite(byte[] buffer, int offset, int count, System.AsyncCallback callback, object state) + 0x64 bytes
FileStream.BeginWrite(byte[] array, int offset, int numBytes, System.AsyncCallback userCallback, object stateObject) + 0x7c bytes
IsolatedStorageFileStream.BeginWrite(byte[] buffer, int offset, int numBytes, System.AsyncCallback userCallback, object stateObject) + 0x1a bytes
MainPage.ReadCallback(System.IAsyncResult asyncResult) Line 190 + 0x1b bytes C#
Stream.BeginRead(byte[] buffer, int offset, int count, System.AsyncCallback callback, object state) + 0x6c bytes
MainPage.WriteCallback(System.IAsyncResult asyncResult) Line 200 + 0x1f bytes C#
Stream.BeginWrite(byte[] buffer, int offset, int count, System.AsyncCallback callback, object state) + 0x64 bytes
FileStream.BeginWrite(byte[] array, int offset, int numBytes, System.AsyncCallback userCallback, object stateObject) + 0x7c bytes
IsolatedStorageFileStream.BeginWrite(byte[] buffer, int offset, int numBytes, System.AsyncCallback userCallback, object stateObject) + 0x1a bytes
(...)
该堆栈显示异步回调被立即调用,并且执行不会返回到调用函数,直到可能所有迭代都完成。我希望当一个函数异步调用另一个函数时,直到调用函数完成并从堆栈中删除时才会调用回调。然而,当前的行为会导致堆栈在数百次迭代后溢出。
我正在尝试通过使用反应式扩展来解决此限制(或者这真的不应该被称为错误吗?)。所以我试图用可观察的模式替换读取网络流的迭代,例如:
var readerFunc = Observable.FromAsyncPattern<byte[], int, int, int>(responseStream.BeginRead, responseStream.EndRead);
但这里的问题是 Windows Phone 版本的反应式库已被精简为仅支持两个参数和返回参数:
Observable.FromAsyncPattern<T1, T2, TResult>
所以我无法按照上面的要求定义读取函数,因为这需要三个参数。即使该库的可下载版本也不提供更多参数。
最后,我的问题是:
除了使用反应式扩展之外,还有其他方法可以解决同步调用异步调用并填充堆栈的原始问题吗?
如果没有,那么如何使用 Windows Phone 上可用的受限版本的响应式扩展从网络流中异步读取并写入文件流?
非常感谢任何帮助!
I am writing some code that downloads a file from a network resource to disk. Reading from the network is done asynchronously, as well as writing. I am observing a problem that asynchronously calls are in fact being called synchronously, so that each new iteration creates a new function call on the stack. Here is the code:
private IsolatedStorageFileStream fileStream = null;
private HttpWebRequest webRequest = null;
private Stream responseStream = null;
private long responsePosition = 0;
private static int BufferSize = 4096;
private byte[] bufferRead = new byte[BufferSize];
private void button4_Click(object sender, RoutedEventArgs e)
{
string fileName = "TestFile.mp3";
using( var store = IsolatedStorageFile.GetUserStoreForApplication() )
{
if( store.FileExists(fileName) )
{
store.DeleteFile(fileName);
}
fileStream = store.OpenFile(fileName, FileMode.CreateNew, FileAccess.Write, FileShare.Read);
webRequest = WebRequest.Create(new Uri(mpsUri.Text)) as HttpWebRequest;
var observableRequest = Observable.FromAsyncPattern<WebResponse>(webRequest.BeginGetResponse, webRequest.EndGetResponse);
Observable.Timeout(observableRequest.Invoke(), TimeSpan.FromMinutes(2))
.Subscribe(response => { ResponseCallback(response); }, exception => { TimeoutCallback(); });
}
}
private void TimeoutCallback()
{
webRequest.Abort();
MessageBox.Show("Request timed-out");
}
private void ResponseCallback(WebResponse webResponse)
{
if( (webResponse as HttpWebResponse).StatusCode != HttpStatusCode.OK )
{
MessageBox.Show("Download error1");
}
else
{
responseStream = webResponse.GetResponseStream();
if( responsePosition != 0 )
{
responseStream.Position = responsePosition;
}
IAsyncResult readResult = responseStream.BeginRead(bufferRead, 0, BufferSize, new AsyncCallback(ReadCallback), null);
return;
}
webResponse.Close();
MessageBox.Show("Download error2");
}
private void ReadCallback(IAsyncResult asyncResult)
{
int bytes = responseStream.EndRead(asyncResult);
DLog.Info("store:{0}, current size:{1}", bytes, fileStream.Length);
if( bytes > 0 )
{
fileStream.BeginWrite(bufferRead, 0, bytes, WriteCallback, null);
return;
}
responseStream.Close();
MessageBox.Show("Download error3");
}
private void WriteCallback(IAsyncResult asyncResult)
{
DLog.Info("Stored!");
responseStream.BeginRead(bufferRead, 0, BufferSize, new AsyncCallback(ReadCallback), null);
}
And here is a snippet of the (slightly edited) stack:
(...)
MainPage.ReadCallback(System.IAsyncResult asyncResult) Line 190 + 0x1b bytes C#
Stream.BeginRead(byte[] buffer, int offset, int count, System.AsyncCallback callback, object state) + 0x6c bytes
MainPage.WriteCallback(System.IAsyncResult asyncResult) Line 200 + 0x1f bytes C#
Stream.BeginWrite(byte[] buffer, int offset, int count, System.AsyncCallback callback, object state) + 0x64 bytes
FileStream.BeginWrite(byte[] array, int offset, int numBytes, System.AsyncCallback userCallback, object stateObject) + 0x7c bytes
IsolatedStorageFileStream.BeginWrite(byte[] buffer, int offset, int numBytes, System.AsyncCallback userCallback, object stateObject) + 0x1a bytes
MainPage.ReadCallback(System.IAsyncResult asyncResult) Line 190 + 0x1b bytes C#
Stream.BeginRead(byte[] buffer, int offset, int count, System.AsyncCallback callback, object state) + 0x6c bytes
MainPage.WriteCallback(System.IAsyncResult asyncResult) Line 200 + 0x1f bytes C#
Stream.BeginWrite(byte[] buffer, int offset, int count, System.AsyncCallback callback, object state) + 0x64 bytes
FileStream.BeginWrite(byte[] array, int offset, int numBytes, System.AsyncCallback userCallback, object stateObject) + 0x7c bytes
IsolatedStorageFileStream.BeginWrite(byte[] buffer, int offset, int numBytes, System.AsyncCallback userCallback, object stateObject) + 0x1a bytes
MainPage.ReadCallback(System.IAsyncResult asyncResult) Line 190 + 0x1b bytes C#
Stream.BeginRead(byte[] buffer, int offset, int count, System.AsyncCallback callback, object state) + 0x6c bytes
MainPage.WriteCallback(System.IAsyncResult asyncResult) Line 200 + 0x1f bytes C#
Stream.BeginWrite(byte[] buffer, int offset, int count, System.AsyncCallback callback, object state) + 0x64 bytes
FileStream.BeginWrite(byte[] array, int offset, int numBytes, System.AsyncCallback userCallback, object stateObject) + 0x7c bytes
IsolatedStorageFileStream.BeginWrite(byte[] buffer, int offset, int numBytes, System.AsyncCallback userCallback, object stateObject) + 0x1a bytes
MainPage.ReadCallback(System.IAsyncResult asyncResult) Line 190 + 0x1b bytes C#
Stream.BeginRead(byte[] buffer, int offset, int count, System.AsyncCallback callback, object state) + 0x6c bytes
MainPage.WriteCallback(System.IAsyncResult asyncResult) Line 200 + 0x1f bytes C#
Stream.BeginWrite(byte[] buffer, int offset, int count, System.AsyncCallback callback, object state) + 0x64 bytes
FileStream.BeginWrite(byte[] array, int offset, int numBytes, System.AsyncCallback userCallback, object stateObject) + 0x7c bytes
IsolatedStorageFileStream.BeginWrite(byte[] buffer, int offset, int numBytes, System.AsyncCallback userCallback, object stateObject) + 0x1a bytes
(...)
This stack shows that the asynchronous callback is called immediately and the execution doesn't return to the calling function until probably all iterations are finished. I would expect that when a function calls another function asynchronously, the callback isn't called until the calling function completes and is removed from the stack. The current behavior, however, causes the stack to overflow after a couple of hundred iterations.
I am trying to workaround this limitation (or shouldn't this really be called a bug?) by using reactive extensions. So I am trying to replace the iteration that reads the network stream by an observable pattern, e.g.:
var readerFunc = Observable.FromAsyncPattern<byte[], int, int, int>(responseStream.BeginRead, responseStream.EndRead);
But in here the problems is that the Windows Phone version of the reactive library has been stripped down to support only two parameters and the return parameter:
Observable.FromAsyncPattern<T1, T2, TResult>
So I can't define the reading function as I wanted above because that would require three parameters. Even the downloadable version of the library doesn't provide more parameters.
Finally, my questions are:
Is there any other way of working around the original problem with the asynchronous calls being called synchronously and filling up the stack, other than using Reactive Extensions?
If not, then how can I read from network stream and write to a file stream asynchronously using the limited version of Reactive Extensions that is available on Windows Phone?
Any help greatly appreciated!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
真的非常简单......您需要检查
IAsyncResult.CompletedSynchronously
是否为 true。如果是这样,请采取措施分解调用堆栈。也许您可以ThreadPool.QueueUserWorkItem
或WP7等效项?Quite straightforward really... you need to check if
IAsyncResult.CompletedSynchronously
is true. If so, take action to break up the call stack. Perhaps you couldThreadPool.QueueUserWorkItem
or the WP7 equivalent?这里是一个示例。是的,Windows Phone 7 确实有一个有限版本的 Rx,它只接受两个参数和一个结果(而在实际版本中,您可以使用 29 个重载和 14 个可能的参数)。
Here is an example for you. And yes, Windows Phone 7 has indeed a limited version of Rx that accepts only two parameters and a result (while in the actual version you can use 29 overloads and 14 possible parameters).