在 C# 中发送/接收 GZip 压缩的 MSMQ 消息

发布于 2024-12-02 06:19:47 字数 1235 浏览 0 评论 0原文

我正在尝试将大对象(> 30MB)发送到 MSMQ 队列。由于我们要发送的数据量很大,因此我们的想法是在发送对象之前对其进行 GZip,然后在接收端将其解压缩。

但是,将压缩流写入 message.BodyStream 属性似乎可以工作,但不能从那里读取它。 我不知道出了什么问题。

Message l_QueueMessage = new Message();
l_QueueMessage.Priority = priority;

using (MessageQueue l_Queue = CreateQueue())
{
    GZipStream stream = new GZipStream(l_QueueMessage.BodyStream, CompressionMode.Compress);

    Formatter.Serialize(stream, message);

    l_Queue.Send(l_QueueMessage);
}

Formatter 是 BinaryFormatter 类型的全局属性。这用于序列化/反序列化为我们想要发送/接收的对象类型,例如“ProductItem”。

接收端看起来像这样:

GZipStream stream = new GZipStream(l_Message.BodyStream, CompressionMode.Decompress);

object decompressedObject = Formatter.Deserialize(stream);

ProductItem l_Item = decompressedObject as ProductItem;

m_ProductReceived(sender, new MessageReceivedEventArgs<ProductItem>(l_Item));

l_ProductQueue.BeginReceive();

我尝试反序列化时收到 EndOfStreamException "{"Unable to read except the end of the stream."} 在 System.IO.BinaryReader.ReadByte()

使用 messageBodyStream 属性实际上绕过了 message.Formatter,我没有将其初始化为任何内容,因为我正在将自己的 ser/deser 机制与 GZipStream 一起使用。但是,我不确定这是否是正确的方法。

我缺少什么? 谢谢!

I am trying to send large objects (>30MB) to a MSMQ queue. Due to the large amount of data we are are tring to send the idea was to GZip the objects prior to sending them, then unzipping them on the receiving end.

However, writing the compressed stream to the message.BodyStream property seems to work, but not reading it out from there.
I don't know what's wrong.

Message l_QueueMessage = new Message();
l_QueueMessage.Priority = priority;

using (MessageQueue l_Queue = CreateQueue())
{
    GZipStream stream = new GZipStream(l_QueueMessage.BodyStream, CompressionMode.Compress);

    Formatter.Serialize(stream, message);

    l_Queue.Send(l_QueueMessage);
}

The Formatter is a global property of type BinaryFormatter. This is used to serialize/deserialize to the type of object we want to send/receive, e.g. "ProductItem".

The receving end looks like this:

GZipStream stream = new GZipStream(l_Message.BodyStream, CompressionMode.Decompress);

object decompressedObject = Formatter.Deserialize(stream);

ProductItem l_Item = decompressedObject as ProductItem;

m_ProductReceived(sender, new MessageReceivedEventArgs<ProductItem>(l_Item));

l_ProductQueue.BeginReceive();

I get an EndOfStreamException "{"Unable to read beyond the end of the stream."} trying to deserialize
at System.IO.BinaryReader.ReadByte()

Using the messageBodyStream property I actually circumvent the message.Formatter, which I don't initialize to anything, becasue I'm using my own ser/deser mechanism with the GZipStream. However, I am not sure if that's the correct way of doing this.

What am I missing?
Thanks!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

懒的傷心 2024-12-09 06:19:47

在您的原始代码中,问题是您需要关闭GZipStream才能正确写入GZip页脚,然后才能发送它。如果不这样做,您最终会发送无法反序列化的字节。这也是为什么稍后发送的新代码可以工作的原因。

In your original code, the problem is that you need to close the GZipStream in order for a GZip footer to be written correctly, and only then you can send it. If you dont, you end up sending bytes that can not be deserialized. That's also why your new code where sending is done later works.

寂寞美少年 2024-12-09 06:19:47

好的,我成功了。关键是将接收器上的解压流转换为 byte[] 数组。然后反序列化开始工作。

发送方代码(请注意,在发送消息之前流已关闭):

using (MessageQueue l_Queue = CreateQueue())
{
    using (GZipStream stream = new GZipStream(l_QueueMessage.BodyStream, CompressionMode.Compress, true))
    {
        Formatter.Serialize(stream, message);
    }

    l_Queue.Send(l_QueueMessage);
}

接收端(请注意我如何将流转换为 byte[],然后反序列化):

using (GZipStream stream = new GZipStream(l_QueueMessage.BodyStream, CompressionMode.Decompress))
{
    byte[] bytes = ReadFully(stream);

    using (MemoryStream ms = new MemoryStream(bytes))
    {
        decompressedObject = Formatter.Deserialize(ms);
    }
}

不知道为什么使用 ReadFully() 函数而不是 Stream.CopyTo() 可以工作。
有人吗?

顺便说一句,ReadFully() 是一个从 Stream 中创建 byte[] 的函数。我必须将此归功于 Jon Skeet,网址为 http://www.yoda.arachsys.com/ csharp/readbinary.html。谢谢!

OK, I made this work. The key was to convert the decompressed stream on the receiver to a byte[] array. Then the deserialization started working.

The sender code (notice the stream is closed before sending the message):

using (MessageQueue l_Queue = CreateQueue())
{
    using (GZipStream stream = new GZipStream(l_QueueMessage.BodyStream, CompressionMode.Compress, true))
    {
        Formatter.Serialize(stream, message);
    }

    l_Queue.Send(l_QueueMessage);
}

The receiving end (notice how I convert the stream to a byte[] then deserialize):

using (GZipStream stream = new GZipStream(l_QueueMessage.BodyStream, CompressionMode.Decompress))
{
    byte[] bytes = ReadFully(stream);

    using (MemoryStream ms = new MemoryStream(bytes))
    {
        decompressedObject = Formatter.Deserialize(ms);
    }
}

Still, don't know why this works using the ReadFully() function and not the Stream.CopyTo().
Does anyone?

Btw, ReadFully() is a function that creates a byte[] out of a Stream. I have to credit Jon Skeet for this at http://www.yoda.arachsys.com/csharp/readbinary.html. Thanks!

望喜 2024-12-09 06:19:47

尝试将压缩和发送分开:

byte[] binaryBuffer = null;
using (MemoryStream compressedBody = new MemoryStream()) 
{
    using(GZipStream stream = new GZipStream(compressedBody, CompressionMode.Compress))
    {
        Formatter.Serialize(compressedBody, message); 
        binaryBuffer = compressedBody.GetBuffer();
    }
}

using (MessageQueue l_Queue = CreateQueue())        
{            
    l_QueueMessage.BodyStream.Write(binaryBuffer, 0, binaryBuffer.Length);
    l_QueueMessage.BodyStream.Seek(0, SeekOrigin.Begin);
    l_Queue.Send(l_QueueMessage);
}

Try to separate compressing and sending:

byte[] binaryBuffer = null;
using (MemoryStream compressedBody = new MemoryStream()) 
{
    using(GZipStream stream = new GZipStream(compressedBody, CompressionMode.Compress))
    {
        Formatter.Serialize(compressedBody, message); 
        binaryBuffer = compressedBody.GetBuffer();
    }
}

using (MessageQueue l_Queue = CreateQueue())        
{            
    l_QueueMessage.BodyStream.Write(binaryBuffer, 0, binaryBuffer.Length);
    l_QueueMessage.BodyStream.Seek(0, SeekOrigin.Begin);
    l_Queue.Send(l_QueueMessage);
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文