WCF 服务,由于“意外的文件结束”而引发流异常对于传入流

发布于 2024-11-19 15:06:52 字数 5069 浏览 1 评论 0原文

我编写了一个交换传输代理,用于将电子邮件正文上传到 WCF 服务。该服务与 Exchange 位于同一台机器上,侦听 localhost:1530 以接收传入的 TCP 连接。传输代理是使用 .NET 3.5 Framework 实现的,并且该服务在 .NET 4.0 框架上实现的 Windows 服务中自托管。

我的问题:为什么流在阅读完成之前就终止了,我该如何修复它?

服务合同的定义如下:

[ServiceContract]
public interface IReceiveService
{
    [OperationContract]
    Guid ImportMessage(DateTime dateTimeReceived, string from, IList<string> to, string subject, int attachmentCount, bool bodyEmpty, Guid clientID);

    [OperationContract]
    void ImportMessageBody(IMessageBody body);

    [OperationContract]
    void ImportMessageAttachment(IMessageAttachment attachment);
}

更新:我已重新订购这是为了让人们更容易快速阅读问题,而不必阅读我的描述的其余部分,这很长。显示我如何启动 Task 来处理请求的第一部分似乎是问题所在。事实上,如果我注释掉 Task.Factory.StartNew() 部分,Stream.CopyTo 就可以工作。

在我的服务实现中,我尝试使用 Stream.CopyTo 将传入流复制到临时文件,如下所示:

public void ImportMessageBody(IMessageBody body)
{
    Task.Factory.StartNew(() =>
        {
            string fileName = GetFileNameFromMagicalPlace();
            using (FileStream fs = new FileStream(fileName, FileMode.Append))
            {
                body.Data.CopyTo(fs); // <---- throws System.IOException right here
            }
        });
}

异常错误为:“错误:读取流时引发异常。”堆栈跟踪:

   at System.ServiceModel.Dispatcher.StreamFormatter.MessageBodyStream.Read(Byte[] buffer, Int32 offset, Int32 count) 
   at System.IO.Stream.CopyTo...

存在内部异常:

System.Xml.XmlException: Unexpected end of file. Following elements are not closed: Address, IMessageBody, Body, Envelope. 
    at System.Xml.XmlExceptionHelper.ThrowXmlException(XmlDictionaryReader reader, String res, String arg1, String arg2, String arg3) 
    at System.Xml.XmlBufferReader.GetByteHard() 
    at System.Xml.XmlBufferReader.ReadMultiByteUInt31() 
    at System.Xml.XmlBinaryReader.ReadName(StringHandle handle) 
    at System.Xml.XmlBinaryReader.ReadNode() 
    at System.ServiceModel.Dispatcher.StreamFormatter.MessageBodyStream.Read(Byte[] buffer, Int32 offset, Int32 count) 

其他详细信息如下。


IMessageBody 的定义如下:

[MessageContract]
public class IMessageBody
{
    [MessageHeader]
    public Guid MessageID { get; set; }

    [MessageHeader]
    public string Format { get; set; }

    [MessageBodyMember]
    public System.IO.Stream Data { get; set; }
}

显示相关位的传输代理的缩写版本(我希望):

public class Agent : RoutingAgent
{
    public delegate void PostingDelegate(MailItem item);
    IReceiveService service;    

    public Agent()
    {
        string tcpServiceUri = "net.tcp://localhost:1530";

        NetTcpBinding endpointBinding = new NetTcpBinding();
        endpointBinding.TransferMode = TransferMode.Streamed;

        ServiceEndpoint serviceEndpoint = new ServiceEndpoint(
            ContractDescription.GetContract(typeof(IReceiveService)),
            endpointBinding,
            new EndpointAddress(tcpServiceUri));
        ChannelFactory<IReceiveService> factory = new ChannelFactory<IReceiveService>(serviceEndpoint);
        service = factory.CreateChannel();

        this.OnSubmittedMessage += new SubmittedMessageEventHandler(Agent_OnSubmittedMessage);
    }

    void Agent_OnSubmittedMessage(SubmittedMessageEventSource source, QueuedMessageEventArgs e)
    {
        if (TheseAreTheDroidsImLookingFor(e))
        {
            PostingDelegate del = PostData;
            del.BeginInvoke(e.MailItem, CompletePost, GetAgentAsyncContext());
        }
    }

    void PostData(MailItem item)
    {
        // Body class is basically direct implementation of IMessageBody 
        // with a constructor to set up the public properties from MailItem.
        var body = new Body(item); 
        service.ImportMessageBody(body);
    }

    void CompletePost(IAsyncResult ar)
    {
        var context = ar.AsyncState as AgentAsyncContext;
        context.Complete();
    }
}

最后,服务实现的托管如下:

string queueUri = String.Format("net.tcp://localhost:{0}/{1}", port, serviceName);
            try
            {
                host = new ServiceHost(typeof(ReceiveService), new Uri(queueUri));
                host.AddDefaultEndpoints();
                var endpoint = host.Description.Endpoints.First();
                ((NetTcpBinding)endpoint.Binding).TransferMode = TransferMode.Streamed;                 

                trace.Log(Level.Debug,String.Format("Created service host: {0}", host));                

                host.Open();                
                trace.Log(Level.Debug,"Opened service host.");
            }
            catch (Exception e)
            {
                string message = String.Format("Threw exception while attempting to create ServiceHost for {0}:{1}\n{2}", queueUri, e.Message, e.StackTrace);
                trace.Log(Level.Debug,message);
                trace.Log(Level.Error, message);
            }

I've written an exchange transport agent that uploads the body of an email message to a WCF service. The service sits on the same box as Exchange, listening on localhost:1530 to receive incoming TCP connections. The transport agent is implemented with the .NET 3.5 Framework, and the service is self-hosted in a Windows Service implemented on the .NET 4.0 framework.

My question: why is the stream terminated before reading is complete, and how do I fix it?

The service contract is defined like this:

[ServiceContract]
public interface IReceiveService
{
    [OperationContract]
    Guid ImportMessage(DateTime dateTimeReceived, string from, IList<string> to, string subject, int attachmentCount, bool bodyEmpty, Guid clientID);

    [OperationContract]
    void ImportMessageBody(IMessageBody body);

    [OperationContract]
    void ImportMessageAttachment(IMessageAttachment attachment);
}

Update: I've re-ordered this to make it easier for someone to quickly read the problem without necessarily having to read the rest of my description, which is long. This first bit showing how I kick off a Task to process requests seems to be the problem. Indeed, if I comment out the Task.Factory.StartNew() part, the Stream.CopyTo works.

In my service implementation, I try to use Stream.CopyTo to copy the incoming stream to a temp file like this:

public void ImportMessageBody(IMessageBody body)
{
    Task.Factory.StartNew(() =>
        {
            string fileName = GetFileNameFromMagicalPlace();
            using (FileStream fs = new FileStream(fileName, FileMode.Append))
            {
                body.Data.CopyTo(fs); // <---- throws System.IOException right here
            }
        });
}

The exception error is: "error: An exception has been thrown when reading the stream." The stack trace:

   at System.ServiceModel.Dispatcher.StreamFormatter.MessageBodyStream.Read(Byte[] buffer, Int32 offset, Int32 count) 
   at System.IO.Stream.CopyTo...

There is an inner exception:

System.Xml.XmlException: Unexpected end of file. Following elements are not closed: Address, IMessageBody, Body, Envelope. 
    at System.Xml.XmlExceptionHelper.ThrowXmlException(XmlDictionaryReader reader, String res, String arg1, String arg2, String arg3) 
    at System.Xml.XmlBufferReader.GetByteHard() 
    at System.Xml.XmlBufferReader.ReadMultiByteUInt31() 
    at System.Xml.XmlBinaryReader.ReadName(StringHandle handle) 
    at System.Xml.XmlBinaryReader.ReadNode() 
    at System.ServiceModel.Dispatcher.StreamFormatter.MessageBodyStream.Read(Byte[] buffer, Int32 offset, Int32 count) 

Other details follow.


IMessageBody is defined like this:

[MessageContract]
public class IMessageBody
{
    [MessageHeader]
    public Guid MessageID { get; set; }

    [MessageHeader]
    public string Format { get; set; }

    [MessageBodyMember]
    public System.IO.Stream Data { get; set; }
}

An abbreviated version of the transport agent showing the relevant bits (I hope):

public class Agent : RoutingAgent
{
    public delegate void PostingDelegate(MailItem item);
    IReceiveService service;    

    public Agent()
    {
        string tcpServiceUri = "net.tcp://localhost:1530";

        NetTcpBinding endpointBinding = new NetTcpBinding();
        endpointBinding.TransferMode = TransferMode.Streamed;

        ServiceEndpoint serviceEndpoint = new ServiceEndpoint(
            ContractDescription.GetContract(typeof(IReceiveService)),
            endpointBinding,
            new EndpointAddress(tcpServiceUri));
        ChannelFactory<IReceiveService> factory = new ChannelFactory<IReceiveService>(serviceEndpoint);
        service = factory.CreateChannel();

        this.OnSubmittedMessage += new SubmittedMessageEventHandler(Agent_OnSubmittedMessage);
    }

    void Agent_OnSubmittedMessage(SubmittedMessageEventSource source, QueuedMessageEventArgs e)
    {
        if (TheseAreTheDroidsImLookingFor(e))
        {
            PostingDelegate del = PostData;
            del.BeginInvoke(e.MailItem, CompletePost, GetAgentAsyncContext());
        }
    }

    void PostData(MailItem item)
    {
        // Body class is basically direct implementation of IMessageBody 
        // with a constructor to set up the public properties from MailItem.
        var body = new Body(item); 
        service.ImportMessageBody(body);
    }

    void CompletePost(IAsyncResult ar)
    {
        var context = ar.AsyncState as AgentAsyncContext;
        context.Complete();
    }
}

Lastly, the service implementation is hosted like this:

string queueUri = String.Format("net.tcp://localhost:{0}/{1}", port, serviceName);
            try
            {
                host = new ServiceHost(typeof(ReceiveService), new Uri(queueUri));
                host.AddDefaultEndpoints();
                var endpoint = host.Description.Endpoints.First();
                ((NetTcpBinding)endpoint.Binding).TransferMode = TransferMode.Streamed;                 

                trace.Log(Level.Debug,String.Format("Created service host: {0}", host));                

                host.Open();                
                trace.Log(Level.Debug,"Opened service host.");
            }
            catch (Exception e)
            {
                string message = String.Format("Threw exception while attempting to create ServiceHost for {0}:{1}\n{2}", queueUri, e.Message, e.StackTrace);
                trace.Log(Level.Debug,message);
                trace.Log(Level.Error, message);
            }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

小帐篷 2024-11-26 15:06:52

好吧,代码太多了。您应该尝试将代码最小化为尽可能小的可重现示例。我想问题可能出在这里:

public void ImportMessageBody(IMessageBody body)
{
    Task.Factory.StartNew(() =>
        {
            string fileName = GetFileNameFromMagicalPlace();
            using (FileStream fs = new FileStream(fileName, FileMode.Append))
            {
                body.Data.CopyTo(fs); // <---- throws System.IOException right here
            }
        });
}

默认情况下应用了 OperationBehavior 此行为包含默认设置为 true 的 AutoDisposeParameter ,其中 在以下情况下处置所有一次性参数操作结束。因此,我希望该操作在您的 Task 能够处理整个流之前结束。

Well it is too much code. You should try to minimize code to smallest possible reproducible example. I guess the problem can be here:

public void ImportMessageBody(IMessageBody body)
{
    Task.Factory.StartNew(() =>
        {
            string fileName = GetFileNameFromMagicalPlace();
            using (FileStream fs = new FileStream(fileName, FileMode.Append))
            {
                body.Data.CopyTo(fs); // <---- throws System.IOException right here
            }
        });
}

There is OperationBehavior applied by default this behavior contains AutoDisposeParameter set by default to true which disposes all disposable parameters when the operation ends. So I expect that operation ends before your Task is able to process the whole stream.

十级心震 2024-11-26 15:06:52

问题本质上是我通过调用 Task.Factory.StartNew 进行了不必要的并发操作,而不是通过 ServiceBehaviorAttribute 确保我的服务已正确配置为并发操作。感谢拉迪斯拉夫为我指明了正确的方向。

The issue was essentially that I was doing unnecessary concurrency by invoking Task.Factory.StartNew instead of making sure my service was properly configured for concurrency through the ServiceBehaviorAttribute. Thanks to Ladislav for pointng me in the right direction.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文