WCF 服务,由于“意外的文件结束”而引发流异常对于传入流
我编写了一个交换传输代理,用于将电子邮件正文上传到 WCF 服务。该服务与 Exchange 位于同一台机器上,侦听 localhost:1530 以接收传入的 TCP 连接。传输代理是使用 .NET 3.5 Framework 实现的,并且该服务在 .NET 4.0 框架上实现的 Windows 服务中自托管。
我的问题:为什么流在阅读完成之前就终止了,我该如何修复它?
服务合同的定义如下:
[ServiceContract]
public interface IReceiveService
{
[OperationContract]
Guid ImportMessage(DateTime dateTimeReceived, string from, IList<string> to, string subject, int attachmentCount, bool bodyEmpty, Guid clientID);
[OperationContract]
void ImportMessageBody(IMessageBody body);
[OperationContract]
void ImportMessageAttachment(IMessageAttachment attachment);
}
更新:我已重新订购这是为了让人们更容易快速阅读问题,而不必阅读我的描述的其余部分,这很长。显示我如何启动 Task
来处理请求的第一部分似乎是问题所在。事实上,如果我注释掉 Task.Factory.StartNew()
部分,Stream.CopyTo
就可以工作。
在我的服务实现中,我尝试使用 Stream.CopyTo 将传入流复制到临时文件,如下所示:
public void ImportMessageBody(IMessageBody body)
{
Task.Factory.StartNew(() =>
{
string fileName = GetFileNameFromMagicalPlace();
using (FileStream fs = new FileStream(fileName, FileMode.Append))
{
body.Data.CopyTo(fs); // <---- throws System.IOException right here
}
});
}
异常错误为:“错误:读取流时引发异常。”堆栈跟踪:
at System.ServiceModel.Dispatcher.StreamFormatter.MessageBodyStream.Read(Byte[] buffer, Int32 offset, Int32 count)
at System.IO.Stream.CopyTo...
存在内部异常:
System.Xml.XmlException: Unexpected end of file. Following elements are not closed: Address, IMessageBody, Body, Envelope.
at System.Xml.XmlExceptionHelper.ThrowXmlException(XmlDictionaryReader reader, String res, String arg1, String arg2, String arg3)
at System.Xml.XmlBufferReader.GetByteHard()
at System.Xml.XmlBufferReader.ReadMultiByteUInt31()
at System.Xml.XmlBinaryReader.ReadName(StringHandle handle)
at System.Xml.XmlBinaryReader.ReadNode()
at System.ServiceModel.Dispatcher.StreamFormatter.MessageBodyStream.Read(Byte[] buffer, Int32 offset, Int32 count)
其他详细信息如下。
IMessageBody
的定义如下:
[MessageContract]
public class IMessageBody
{
[MessageHeader]
public Guid MessageID { get; set; }
[MessageHeader]
public string Format { get; set; }
[MessageBodyMember]
public System.IO.Stream Data { get; set; }
}
显示相关位的传输代理的缩写版本(我希望):
public class Agent : RoutingAgent
{
public delegate void PostingDelegate(MailItem item);
IReceiveService service;
public Agent()
{
string tcpServiceUri = "net.tcp://localhost:1530";
NetTcpBinding endpointBinding = new NetTcpBinding();
endpointBinding.TransferMode = TransferMode.Streamed;
ServiceEndpoint serviceEndpoint = new ServiceEndpoint(
ContractDescription.GetContract(typeof(IReceiveService)),
endpointBinding,
new EndpointAddress(tcpServiceUri));
ChannelFactory<IReceiveService> factory = new ChannelFactory<IReceiveService>(serviceEndpoint);
service = factory.CreateChannel();
this.OnSubmittedMessage += new SubmittedMessageEventHandler(Agent_OnSubmittedMessage);
}
void Agent_OnSubmittedMessage(SubmittedMessageEventSource source, QueuedMessageEventArgs e)
{
if (TheseAreTheDroidsImLookingFor(e))
{
PostingDelegate del = PostData;
del.BeginInvoke(e.MailItem, CompletePost, GetAgentAsyncContext());
}
}
void PostData(MailItem item)
{
// Body class is basically direct implementation of IMessageBody
// with a constructor to set up the public properties from MailItem.
var body = new Body(item);
service.ImportMessageBody(body);
}
void CompletePost(IAsyncResult ar)
{
var context = ar.AsyncState as AgentAsyncContext;
context.Complete();
}
}
最后,服务实现的托管如下:
string queueUri = String.Format("net.tcp://localhost:{0}/{1}", port, serviceName);
try
{
host = new ServiceHost(typeof(ReceiveService), new Uri(queueUri));
host.AddDefaultEndpoints();
var endpoint = host.Description.Endpoints.First();
((NetTcpBinding)endpoint.Binding).TransferMode = TransferMode.Streamed;
trace.Log(Level.Debug,String.Format("Created service host: {0}", host));
host.Open();
trace.Log(Level.Debug,"Opened service host.");
}
catch (Exception e)
{
string message = String.Format("Threw exception while attempting to create ServiceHost for {0}:{1}\n{2}", queueUri, e.Message, e.StackTrace);
trace.Log(Level.Debug,message);
trace.Log(Level.Error, message);
}
I've written an exchange transport agent that uploads the body of an email message to a WCF service. The service sits on the same box as Exchange, listening on localhost:1530 to receive incoming TCP connections. The transport agent is implemented with the .NET 3.5 Framework, and the service is self-hosted in a Windows Service implemented on the .NET 4.0 framework.
My question: why is the stream terminated before reading is complete, and how do I fix it?
The service contract is defined like this:
[ServiceContract]
public interface IReceiveService
{
[OperationContract]
Guid ImportMessage(DateTime dateTimeReceived, string from, IList<string> to, string subject, int attachmentCount, bool bodyEmpty, Guid clientID);
[OperationContract]
void ImportMessageBody(IMessageBody body);
[OperationContract]
void ImportMessageAttachment(IMessageAttachment attachment);
}
Update: I've re-ordered this to make it easier for someone to quickly read the problem without necessarily having to read the rest of my description, which is long. This first bit showing how I kick off a Task
to process requests seems to be the problem. Indeed, if I comment out the Task.Factory.StartNew()
part, the Stream.CopyTo
works.
In my service implementation, I try to use Stream.CopyTo
to copy the incoming stream to a temp file like this:
public void ImportMessageBody(IMessageBody body)
{
Task.Factory.StartNew(() =>
{
string fileName = GetFileNameFromMagicalPlace();
using (FileStream fs = new FileStream(fileName, FileMode.Append))
{
body.Data.CopyTo(fs); // <---- throws System.IOException right here
}
});
}
The exception error is: "error: An exception has been thrown when reading the stream." The stack trace:
at System.ServiceModel.Dispatcher.StreamFormatter.MessageBodyStream.Read(Byte[] buffer, Int32 offset, Int32 count)
at System.IO.Stream.CopyTo...
There is an inner exception:
System.Xml.XmlException: Unexpected end of file. Following elements are not closed: Address, IMessageBody, Body, Envelope.
at System.Xml.XmlExceptionHelper.ThrowXmlException(XmlDictionaryReader reader, String res, String arg1, String arg2, String arg3)
at System.Xml.XmlBufferReader.GetByteHard()
at System.Xml.XmlBufferReader.ReadMultiByteUInt31()
at System.Xml.XmlBinaryReader.ReadName(StringHandle handle)
at System.Xml.XmlBinaryReader.ReadNode()
at System.ServiceModel.Dispatcher.StreamFormatter.MessageBodyStream.Read(Byte[] buffer, Int32 offset, Int32 count)
Other details follow.
IMessageBody
is defined like this:
[MessageContract]
public class IMessageBody
{
[MessageHeader]
public Guid MessageID { get; set; }
[MessageHeader]
public string Format { get; set; }
[MessageBodyMember]
public System.IO.Stream Data { get; set; }
}
An abbreviated version of the transport agent showing the relevant bits (I hope):
public class Agent : RoutingAgent
{
public delegate void PostingDelegate(MailItem item);
IReceiveService service;
public Agent()
{
string tcpServiceUri = "net.tcp://localhost:1530";
NetTcpBinding endpointBinding = new NetTcpBinding();
endpointBinding.TransferMode = TransferMode.Streamed;
ServiceEndpoint serviceEndpoint = new ServiceEndpoint(
ContractDescription.GetContract(typeof(IReceiveService)),
endpointBinding,
new EndpointAddress(tcpServiceUri));
ChannelFactory<IReceiveService> factory = new ChannelFactory<IReceiveService>(serviceEndpoint);
service = factory.CreateChannel();
this.OnSubmittedMessage += new SubmittedMessageEventHandler(Agent_OnSubmittedMessage);
}
void Agent_OnSubmittedMessage(SubmittedMessageEventSource source, QueuedMessageEventArgs e)
{
if (TheseAreTheDroidsImLookingFor(e))
{
PostingDelegate del = PostData;
del.BeginInvoke(e.MailItem, CompletePost, GetAgentAsyncContext());
}
}
void PostData(MailItem item)
{
// Body class is basically direct implementation of IMessageBody
// with a constructor to set up the public properties from MailItem.
var body = new Body(item);
service.ImportMessageBody(body);
}
void CompletePost(IAsyncResult ar)
{
var context = ar.AsyncState as AgentAsyncContext;
context.Complete();
}
}
Lastly, the service implementation is hosted like this:
string queueUri = String.Format("net.tcp://localhost:{0}/{1}", port, serviceName);
try
{
host = new ServiceHost(typeof(ReceiveService), new Uri(queueUri));
host.AddDefaultEndpoints();
var endpoint = host.Description.Endpoints.First();
((NetTcpBinding)endpoint.Binding).TransferMode = TransferMode.Streamed;
trace.Log(Level.Debug,String.Format("Created service host: {0}", host));
host.Open();
trace.Log(Level.Debug,"Opened service host.");
}
catch (Exception e)
{
string message = String.Format("Threw exception while attempting to create ServiceHost for {0}:{1}\n{2}", queueUri, e.Message, e.StackTrace);
trace.Log(Level.Debug,message);
trace.Log(Level.Error, message);
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
好吧,代码太多了。您应该尝试将代码最小化为尽可能小的可重现示例。我想问题可能出在这里:
默认情况下应用了
OperationBehavior
此行为包含默认设置为 true 的AutoDisposeParameter
,其中 在以下情况下处置所有一次性参数操作结束。因此,我希望该操作在您的Task
能够处理整个流之前结束。Well it is too much code. You should try to minimize code to smallest possible reproducible example. I guess the problem can be here:
There is
OperationBehavior
applied by default this behavior containsAutoDisposeParameter
set by default to true which disposes all disposable parameters when the operation ends. So I expect that operation ends before yourTask
is able to process the whole stream.问题本质上是我通过调用
Task.Factory.StartNew
进行了不必要的并发操作,而不是通过ServiceBehaviorAttribute
确保我的服务已正确配置为并发操作。感谢拉迪斯拉夫为我指明了正确的方向。The issue was essentially that I was doing unnecessary concurrency by invoking
Task.Factory.StartNew
instead of making sure my service was properly configured for concurrency through theServiceBehaviorAttribute
. Thanks to Ladislav for pointng me in the right direction.