从 HttpResponseStream 读取失败
我遇到了一个问题,从 HttpResponseStream 读取失败,因为我所包装的 StreamReader 的读取速度比响应流获取实际响应的速度更快。 我正在检索一个相当小的文件(大约 60k),但是将响应处理为实际对象的解析器失败,因为它遇到了一个意外的字符(代码 65535),根据经验,我知道该字符是您从StreamReader 并且没有更多可用字符。
根据记录,我知道返回的内容是有效的并且将正确解析,因为每次运行代码时失败发生在文件中的不同点。 这是下面的 parser.Load() 行失败的地方。
有没有办法确保我在尝试解析内容之前已读取所有内容,而不是将响应流复制到 MemoryStream 或字符串中然后对其进行处理?
/// <summary>
/// Makes a Query where the expected Result is an RDF Graph ie. CONSTRUCT and DESCRIBE Queries
/// </summary>
/// <param name="sparqlQuery">SPARQL Query String</param>
/// <returns>RDF Graph</returns>
public Graph QueryWithResultGraph(String sparqlQuery)
{
try
{
//Build the Query URI
StringBuilder queryUri = new StringBuilder();
queryUri.Append(this._endpoint.ToString());
queryUri.Append("?query=");
queryUri.Append(Uri.EscapeDataString(sparqlQuery));
if (!this._defaultGraphUri.Equals(String.Empty))
{
queryUri.Append("&default-graph-uri=");
queryUri.Append(Uri.EscapeUriString(this._defaultGraphUri));
}
//Make the Query via HTTP
HttpWebResponse httpResponse = this.DoQuery(new Uri(queryUri.ToString()),false);
//Set up an Empty Graph ready
Graph g = new Graph();
g.BaseURI = this._endpoint;
//Parse into a Graph based on Content Type
String ctype = httpResponse.ContentType;
IRDFReader parser = MIMETypesHelper.GetParser(ctype);
parser.Load(g, new StreamReader(httpResponse.GetResponseStream()));
return g;
}
catch (UriFormatException uriEx)
{
//URI Format Invalid
throw new Exception("The format of the URI was invalid", uriEx);
}
catch (WebException webEx)
{
//Some sort of HTTP Error occurred
throw new Exception("A HTTP Error occurred", webEx);
}
catch (RDFException)
{
//Some problem with the RDF or Parsing thereof
throw;
}
catch (Exception)
{
//Other Exception
throw;
}
}
/// <summary>
/// Internal Helper Method which executes the HTTP Requests against the SPARQL Endpoint
/// </summary>
/// <param name="target">URI to make Request to</param>
/// <param name="sparqlOnly">Indicates if only SPARQL Result Sets should be accepted</param>
/// <returns>HTTP Response</returns>
private HttpWebResponse DoQuery(Uri target, bool sparqlOnly)
{
//Expect errors in this function to be handled by the calling function
//Set-up the Request
HttpWebRequest httpRequest;
HttpWebResponse httpResponse;
httpRequest = (HttpWebRequest)WebRequest.Create(target);
//Use HTTP GET/POST according to user set preference
if (!sparqlOnly)
{
httpRequest.Accept = MIMETypesHelper.HTTPAcceptHeader();
//For the time being drop the application/json as this doesn't play nice with Virtuoso
httpRequest.Accept = httpRequest.Accept.Replace("," + MIMETypesHelper.JSON[0], String.Empty);
}
else
{
httpRequest.Accept = MIMETypesHelper.HTTPSPARQLAcceptHeader();
}
httpRequest.Method = this._httpMode;
httpRequest.Timeout = this._timeout;
//HTTP Debugging
if (Options.HTTPDebugging)
{
Tools.HTTPDebugRequest(httpRequest);
}
httpResponse = (HttpWebResponse)httpRequest.GetResponse();
//HTTP Debugging
if (Options.HTTPDebugging)
{
Tools.HTTPDebugResponse(httpResponse);
}
return httpResponse;
}
编辑
为了澄清我已经说过的,这不是解析器中的错误,这是 StreamReader 读取速度快于响应流提供数据的问题。 我可以通过执行以下操作来解决此问题,但希望获得更好或更优雅的解决方案的建议:
//Parse into a Graph based on Content Type
String ctype = httpResponse.ContentType;
IRDFReader parser = MIMETypesHelper.GetParser(ctype);
Stream response = httpResponse.GetResponseStream();
MemoryStream temp = new MemoryStream();
Tools.StreamCopy(response, temp);
response.Close();
temp.Seek(0, SeekOrigin.Begin);
parser.Load(g, new StreamReader(temp));
编辑 2
根据 Eamon 的建议,BlockingStreamReader 类:
/// <summary>
/// A wrapper to a Stream which does all its Read() and Peek() calls using ReadBlock() to handle slow underlying streams (eg Network Streams)
/// </summary>
public sealed class BlockingStreamReader : StreamReader
{
private bool _peeked = false;
private int _peekChar = -1;
public BlockingStreamReader(StreamReader reader) : base(reader.BaseStream) { }
public BlockingStreamReader(Stream stream) : base(stream) { }
public override int Read()
{
if (this._peeked)
{
this._peeked = false;
return this._peekChar;
}
else
{
if (this.EndOfStream) return -1;
char[] cs = new char[1];
base.ReadBlock(cs, 0, 1);
return cs[0];
}
}
public override int Peek()
{
if (this._peeked)
{
return this._peekChar;
}
else
{
if (this.EndOfStream) return -1;
this._peeked = true;
char[] cs = new char[1];
base.ReadBlock(cs, 0, 1);
this._peekChar = cs[0];
return this._peekChar;
}
}
public new bool EndOfStream
{
get
{
return (base.EndOfStream && !this._peeked);
}
}
}
编辑 3
这里有一个很大的改进解决方案可以包装任何 TextReader
并提供 EndOfStream
属性。 它使用一个内部缓冲区,该缓冲区通过在包装的 TextReader
上使用 ReadBlock()
来填充。 阅读器的所有 Read() 方法都可以使用此缓冲区来定义,缓冲区大小是可配置的:
/// <summary>
/// The BlockingTextReader is an implementation of a <see cref="TextReader">TextReader</see> designed to wrap other readers which may or may not have high latency.
/// </summary>
/// <remarks>
/// <para>
/// This is designed to avoid premature detection of end of input when the input has high latency and the consumer tries to read from the input faster than it can return data. All methods are defined by using an internal buffer which is filled using the <see cref="TextReader.ReadBlock">ReadBlock()</see> method of the underlying <see cref="TextReader">TextReader</see>
/// </para>
/// </remarks>
public sealed class BlockingTextReader : TextReader
{
private char[] _buffer;
private int _pos = -1;
private int _bufferAmount = -1;
private bool _finished = false;
private TextReader _reader;
public const int DefaultBufferSize = 1024;
public BlockingTextReader(TextReader reader, int bufferSize)
{
if (reader == null) throw new ArgumentNullException("reader", "Cannot read from a null TextReader");
if (bufferSize < 1) throw new ArgumentException("bufferSize must be >= 1", "bufferSize");
this._reader = reader;
this._buffer = new char[bufferSize];
}
public BlockingTextReader(TextReader reader)
: this(reader, DefaultBufferSize) { }
public BlockingTextReader(Stream input, int bufferSize)
: this(new StreamReader(input), bufferSize) { }
public BlockingTextReader(Stream input)
: this(new StreamReader(input)) { }
private void FillBuffer()
{
this._pos = -1;
if (this._finished)
{
this._bufferAmount = 0;
}
else
{
this._bufferAmount = this._reader.ReadBlock(this._buffer, 0, this._buffer.Length);
if (this._bufferAmount == 0 || this._bufferAmount < this._buffer.Length) this._finished = true;
}
}
public override int ReadBlock(char[] buffer, int index, int count)
{
if (count == 0) return 0;
if (buffer == null) throw new ArgumentNullException("buffer");
if (index < 0) throw new ArgumentException("index", "Index must be >= 0");
if (count < 0) throw new ArgumentException("count", "Count must be >= 0");
if ((buffer.Length - index) < count) throw new ArgumentException("Buffer too small");
if (this._bufferAmount == -1 || this._pos >= this._bufferAmount)
{
if (!this._finished)
{
this.FillBuffer();
if (this.EndOfStream) return 0;
}
else
{
return 0;
}
}
this._pos = Math.Max(0, this._pos);
if (count <= this._bufferAmount - this._pos)
{
//If we have sufficient things buffered to fufill the request just copy the relevant stuff across
Array.Copy(this._buffer, this._pos, buffer, index, count);
this._pos += count;
return count;
}
else
{
int copied = 0;
while (copied < count)
{
int available = this._bufferAmount - this._pos;
if (count < copied + available)
{
//We can finish fufilling this request this round
int toCopy = Math.Min(available, count - copied);
Array.Copy(this._buffer, this._pos, buffer, index + copied, toCopy);
copied += toCopy;
this._pos += toCopy;
return copied;
}
else
{
//Copy everything we currently have available
Array.Copy(this._buffer, this._pos, buffer, index + copied, available);
copied += available;
this._pos = this._bufferAmount;
if (!this._finished)
{
//If we haven't reached the end of the input refill our buffer and continue
this.FillBuffer();
if (this.EndOfStream) return copied;
this._pos = 0;
}
else
{
//Otherwise we have reached the end of the input so just return what we've managed to copy
return copied;
}
}
}
return copied;
}
}
public override int Read(char[] buffer, int index, int count)
{
return this.ReadBlock(buffer, index, count);
}
public override int Read()
{
if (this._bufferAmount == -1 || this._pos >= this._bufferAmount - 1)
{
if (!this._finished)
{
this.FillBuffer();
if (this.EndOfStream) return -1;
}
else
{
return -1;
}
}
this._pos++;
return (int)this._buffer[this._pos];
}
public override int Peek()
{
if (this._bufferAmount == -1 || this._pos >= this._bufferAmount - 1)
{
if (!this._finished)
{
this.FillBuffer();
if (this.EndOfStream) return -1;
}
else
{
return -1;
}
}
return (int)this._buffer[this._pos + 1];
}
public bool EndOfStream
{
get
{
return this._finished && (this._pos >= this._bufferAmount - 1);
}
}
public override void Close()
{
this._reader.Close();
}
protected override void Dispose(bool disposing)
{
this.Close();
this._reader.Dispose();
base.Dispose(disposing);
}
}
I'm running into an issue where reading from a HttpResponseStream fails because the StreamReader that I'm wrapping around in reads faster that the Response Stream gets the actual response. I'm retrieving a reasonably small sized file (around 60k) but the Parser which processes the response into an actual object fails because it hits an unexpected character (Code 65535) which from experience I know to be the character produced when you read from a StreamReader and there are no further characters available.
For the record I know that the content being returned is valid and will parse correctly since the failure occurs at different points in the file each time I run the code. It's the parser.Load() line in the following where it fails.
Is there a way to ensure I've read all the content before attempting to parse it short of copying the response stream into a MemoryStream or string and then processing it?
/// <summary>
/// Makes a Query where the expected Result is an RDF Graph ie. CONSTRUCT and DESCRIBE Queries
/// </summary>
/// <param name="sparqlQuery">SPARQL Query String</param>
/// <returns>RDF Graph</returns>
public Graph QueryWithResultGraph(String sparqlQuery)
{
try
{
//Build the Query URI
StringBuilder queryUri = new StringBuilder();
queryUri.Append(this._endpoint.ToString());
queryUri.Append("?query=");
queryUri.Append(Uri.EscapeDataString(sparqlQuery));
if (!this._defaultGraphUri.Equals(String.Empty))
{
queryUri.Append("&default-graph-uri=");
queryUri.Append(Uri.EscapeUriString(this._defaultGraphUri));
}
//Make the Query via HTTP
HttpWebResponse httpResponse = this.DoQuery(new Uri(queryUri.ToString()),false);
//Set up an Empty Graph ready
Graph g = new Graph();
g.BaseURI = this._endpoint;
//Parse into a Graph based on Content Type
String ctype = httpResponse.ContentType;
IRDFReader parser = MIMETypesHelper.GetParser(ctype);
parser.Load(g, new StreamReader(httpResponse.GetResponseStream()));
return g;
}
catch (UriFormatException uriEx)
{
//URI Format Invalid
throw new Exception("The format of the URI was invalid", uriEx);
}
catch (WebException webEx)
{
//Some sort of HTTP Error occurred
throw new Exception("A HTTP Error occurred", webEx);
}
catch (RDFException)
{
//Some problem with the RDF or Parsing thereof
throw;
}
catch (Exception)
{
//Other Exception
throw;
}
}
/// <summary>
/// Internal Helper Method which executes the HTTP Requests against the SPARQL Endpoint
/// </summary>
/// <param name="target">URI to make Request to</param>
/// <param name="sparqlOnly">Indicates if only SPARQL Result Sets should be accepted</param>
/// <returns>HTTP Response</returns>
private HttpWebResponse DoQuery(Uri target, bool sparqlOnly)
{
//Expect errors in this function to be handled by the calling function
//Set-up the Request
HttpWebRequest httpRequest;
HttpWebResponse httpResponse;
httpRequest = (HttpWebRequest)WebRequest.Create(target);
//Use HTTP GET/POST according to user set preference
if (!sparqlOnly)
{
httpRequest.Accept = MIMETypesHelper.HTTPAcceptHeader();
//For the time being drop the application/json as this doesn't play nice with Virtuoso
httpRequest.Accept = httpRequest.Accept.Replace("," + MIMETypesHelper.JSON[0], String.Empty);
}
else
{
httpRequest.Accept = MIMETypesHelper.HTTPSPARQLAcceptHeader();
}
httpRequest.Method = this._httpMode;
httpRequest.Timeout = this._timeout;
//HTTP Debugging
if (Options.HTTPDebugging)
{
Tools.HTTPDebugRequest(httpRequest);
}
httpResponse = (HttpWebResponse)httpRequest.GetResponse();
//HTTP Debugging
if (Options.HTTPDebugging)
{
Tools.HTTPDebugResponse(httpResponse);
}
return httpResponse;
}
Edit
To clarify what I already stated this is not a bug in the Parser, this is an issue of the StreamReader reading faster than the Response Stream provides data. I can get around this by doing the following but would like suggestions of better or more elegant solutions:
//Parse into a Graph based on Content Type
String ctype = httpResponse.ContentType;
IRDFReader parser = MIMETypesHelper.GetParser(ctype);
Stream response = httpResponse.GetResponseStream();
MemoryStream temp = new MemoryStream();
Tools.StreamCopy(response, temp);
response.Close();
temp.Seek(0, SeekOrigin.Begin);
parser.Load(g, new StreamReader(temp));
Edit 2
BlockingStreamReader class as per Eamon's suggestion:
/// <summary>
/// A wrapper to a Stream which does all its Read() and Peek() calls using ReadBlock() to handle slow underlying streams (eg Network Streams)
/// </summary>
public sealed class BlockingStreamReader : StreamReader
{
private bool _peeked = false;
private int _peekChar = -1;
public BlockingStreamReader(StreamReader reader) : base(reader.BaseStream) { }
public BlockingStreamReader(Stream stream) : base(stream) { }
public override int Read()
{
if (this._peeked)
{
this._peeked = false;
return this._peekChar;
}
else
{
if (this.EndOfStream) return -1;
char[] cs = new char[1];
base.ReadBlock(cs, 0, 1);
return cs[0];
}
}
public override int Peek()
{
if (this._peeked)
{
return this._peekChar;
}
else
{
if (this.EndOfStream) return -1;
this._peeked = true;
char[] cs = new char[1];
base.ReadBlock(cs, 0, 1);
this._peekChar = cs[0];
return this._peekChar;
}
}
public new bool EndOfStream
{
get
{
return (base.EndOfStream && !this._peeked);
}
}
}
Edit 3
Here is a much improved solution which can wrap any TextReader
and provide an EndOfStream
property. It uses an internal buffer which is filled by using ReadBlock()
on the wrapped TextReader
. All the Read() methods of the reader can the be defined using this buffer, buffer size is configurable:
/// <summary>
/// The BlockingTextReader is an implementation of a <see cref="TextReader">TextReader</see> designed to wrap other readers which may or may not have high latency.
/// </summary>
/// <remarks>
/// <para>
/// This is designed to avoid premature detection of end of input when the input has high latency and the consumer tries to read from the input faster than it can return data. All methods are defined by using an internal buffer which is filled using the <see cref="TextReader.ReadBlock">ReadBlock()</see> method of the underlying <see cref="TextReader">TextReader</see>
/// </para>
/// </remarks>
public sealed class BlockingTextReader : TextReader
{
private char[] _buffer;
private int _pos = -1;
private int _bufferAmount = -1;
private bool _finished = false;
private TextReader _reader;
public const int DefaultBufferSize = 1024;
public BlockingTextReader(TextReader reader, int bufferSize)
{
if (reader == null) throw new ArgumentNullException("reader", "Cannot read from a null TextReader");
if (bufferSize < 1) throw new ArgumentException("bufferSize must be >= 1", "bufferSize");
this._reader = reader;
this._buffer = new char[bufferSize];
}
public BlockingTextReader(TextReader reader)
: this(reader, DefaultBufferSize) { }
public BlockingTextReader(Stream input, int bufferSize)
: this(new StreamReader(input), bufferSize) { }
public BlockingTextReader(Stream input)
: this(new StreamReader(input)) { }
private void FillBuffer()
{
this._pos = -1;
if (this._finished)
{
this._bufferAmount = 0;
}
else
{
this._bufferAmount = this._reader.ReadBlock(this._buffer, 0, this._buffer.Length);
if (this._bufferAmount == 0 || this._bufferAmount < this._buffer.Length) this._finished = true;
}
}
public override int ReadBlock(char[] buffer, int index, int count)
{
if (count == 0) return 0;
if (buffer == null) throw new ArgumentNullException("buffer");
if (index < 0) throw new ArgumentException("index", "Index must be >= 0");
if (count < 0) throw new ArgumentException("count", "Count must be >= 0");
if ((buffer.Length - index) < count) throw new ArgumentException("Buffer too small");
if (this._bufferAmount == -1 || this._pos >= this._bufferAmount)
{
if (!this._finished)
{
this.FillBuffer();
if (this.EndOfStream) return 0;
}
else
{
return 0;
}
}
this._pos = Math.Max(0, this._pos);
if (count <= this._bufferAmount - this._pos)
{
//If we have sufficient things buffered to fufill the request just copy the relevant stuff across
Array.Copy(this._buffer, this._pos, buffer, index, count);
this._pos += count;
return count;
}
else
{
int copied = 0;
while (copied < count)
{
int available = this._bufferAmount - this._pos;
if (count < copied + available)
{
//We can finish fufilling this request this round
int toCopy = Math.Min(available, count - copied);
Array.Copy(this._buffer, this._pos, buffer, index + copied, toCopy);
copied += toCopy;
this._pos += toCopy;
return copied;
}
else
{
//Copy everything we currently have available
Array.Copy(this._buffer, this._pos, buffer, index + copied, available);
copied += available;
this._pos = this._bufferAmount;
if (!this._finished)
{
//If we haven't reached the end of the input refill our buffer and continue
this.FillBuffer();
if (this.EndOfStream) return copied;
this._pos = 0;
}
else
{
//Otherwise we have reached the end of the input so just return what we've managed to copy
return copied;
}
}
}
return copied;
}
}
public override int Read(char[] buffer, int index, int count)
{
return this.ReadBlock(buffer, index, count);
}
public override int Read()
{
if (this._bufferAmount == -1 || this._pos >= this._bufferAmount - 1)
{
if (!this._finished)
{
this.FillBuffer();
if (this.EndOfStream) return -1;
}
else
{
return -1;
}
}
this._pos++;
return (int)this._buffer[this._pos];
}
public override int Peek()
{
if (this._bufferAmount == -1 || this._pos >= this._bufferAmount - 1)
{
if (!this._finished)
{
this.FillBuffer();
if (this.EndOfStream) return -1;
}
else
{
return -1;
}
}
return (int)this._buffer[this._pos + 1];
}
public bool EndOfStream
{
get
{
return this._finished && (this._pos >= this._bufferAmount - 1);
}
}
public override void Close()
{
this._reader.Close();
}
protected override void Dispose(bool disposing)
{
this.Close();
this._reader.Dispose();
base.Dispose(disposing);
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
在不知道您正在使用的解析器的具体情况的情况下,我只能猜测该错误,但是 .NET 框架 I/O 库几乎鼓励您犯一个相当容易犯的错误...
您是否知道以下事实:流和 TextReader 读取的字节/字符可能少于请求的字节/字符?
特别是, TextReader.Read(char[] buffer, int index, int count) 的文档说:
强调我的。
例如,如果您调用 reader.Read(buffer, 0, 100),您不能假设已读取 100 个字符。
编辑:解析器很可能确实假设了这一点; 这解释了您观察到的行为:如果您将流完全缓存在 MemoryStream 中,则始终有足够的字符来满足请求 - 但如果不这样做,解析器将收到比请求的字符少的字符当底层流“慢”时,在不可预测的时间。
Edit2:您可以通过用 TextReader.ReadBlock() 替换解析器中的所有 TextReader.Read() 实例来修复错误。
Without knowing the specifics the parser you're using, I can only guess at the bug, but there's a fairly easy to make bug the .NET framework I/O libs almost encourage you to make...
Are you aware of the fact that Streams and TextReaders may read fewer bytes/characters than requested?
In particular, TextReader.Read(char[] buffer, int index, int count)'s docs say:
Emphasis mine.
For example, if you call reader.Read(buffer, 0, 100) you cannot assume that 100 characters have been read.
Edit: It's very likely that the parser does assume this; and this explains your observed behavior: if you fully cache the stream in a MemoryStream, there will always be enough characters to fullfill the request - but if you don't, the parser will receive fewer characters than requested at unpredictable times whenever the underlying stream is "slow".
Edit2: You can fix your bug by replacing all instances of TextReader.Read() in the parser with TextReader.ReadBlock().
要支持阻塞读取场景,您可以子类化
TextReader
,而不是子类化StreamReader
:这可以避免EndOfStream
的问题,这意味着您可以任何读取器阻塞 - 不仅仅是StreamReader
:To support a blocking read scenario, rather than subclassing
StreamReader
, you can subclassTextReader
: this avoids issues withEndOfStream
, and it means you can make any reader blocking - not justStreamReader
s: