想要在 C# .net 中使用安全线程快速读取 msmq 消息

发布于 2024-10-21 23:45:23 字数 1253 浏览 0 评论 0原文

我想读取 MSMQ,其中以字节为单位的队列数据以及在 1 分钟内生成的队列数量约为 1500。因此,如果连续读取,队列 CPU 会持续 30%。一段时间后它停止了。我需要长达 4 小时的大量读取队列.. 所以我希望以不应该被阻塞的方式安全地读取线程。 实际上我不擅长线程所以你能帮我吗..

目前我正在以这种方式阅读

    bool ProcessStatus; //process
    Thread _UDPthreadConsme;

    private void btn_receive_Click(object sender, EventArgs e)
    {

    if (MessageQueue.Exists(@".\private$\myquelocal"))
    {

    ThreadStart _processrcs = new ThreadStart(receivemessages);
    _UDPthreadConsme = new Thread(_processrcs);
    ProcessStatus = true;
    _UDPthreadConsme.Start();
    }
    }


    private void receivemessages()
    {
    MessageBox.Show("Start");
    while (ProcessStatus)
    {
    try
    {

    // Connect to the a queue on the local computer.
    MessageQueue myQueue = new MessageQueue(@".\private$\myquelocal");


    System.Messaging.Message[] myMessagecount = myQueue.GetAllMessages();

    if (myMessagecount.Length <= 0)
    return;


    myQueue.Formatter = new BinaryMessageFormatter();

    // Receive and format the message.
    System.Messaging.Message myMessage = myQueue.Receive();
    byte[] buffer = (byte[])myMessage.Body;

// here i convert buffer to its related structure and then insert the values in database sqlserver.

}
} 

i want to read the MSMQ where queue data in byte and numbers of queues generated in 1 mins about 1500. so if read continuously queue cpu goes on 30%. and after some time it stopped. i need to read queue in high volume upto 4 hrs..
so i want safe thread reading in such a manner that shouldn't be block.
actually i am not good about threading so can you please help me out..

currently i am reading in this manner

    bool ProcessStatus; //process
    Thread _UDPthreadConsme;

    private void btn_receive_Click(object sender, EventArgs e)
    {

    if (MessageQueue.Exists(@".\private$\myquelocal"))
    {

    ThreadStart _processrcs = new ThreadStart(receivemessages);
    _UDPthreadConsme = new Thread(_processrcs);
    ProcessStatus = true;
    _UDPthreadConsme.Start();
    }
    }


    private void receivemessages()
    {
    MessageBox.Show("Start");
    while (ProcessStatus)
    {
    try
    {

    // Connect to the a queue on the local computer.
    MessageQueue myQueue = new MessageQueue(@".\private$\myquelocal");


    System.Messaging.Message[] myMessagecount = myQueue.GetAllMessages();

    if (myMessagecount.Length <= 0)
    return;


    myQueue.Formatter = new BinaryMessageFormatter();

    // Receive and format the message.
    System.Messaging.Message myMessage = myQueue.Receive();
    byte[] buffer = (byte[])myMessage.Body;

// here i convert buffer to its related structure and then insert the values in database sqlserver.

}
} 

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

债姬 2024-10-28 23:45:23

我会像这样重写代码

    private void receivemessages()
    {
        Console.WriteLine("Start");

        MessageQueue myQueue = new MessageQueue(@".\private$\myquelocal");

        while (ProcessStatus)
        {
            try
            {
                // Waits 100 millisec for a message to appear in queue
                System.Messaging.Message msg = myQueue.Receive(new TimeSpan(0, 0, 0, 0, 100));

                // Take care of message and insert data into database

            }
            catch (MessageQueueException)
            {
                // Ignore the timeout exception and continue processing the queue

            }
        }
    }

I would rewrite the code like this

    private void receivemessages()
    {
        Console.WriteLine("Start");

        MessageQueue myQueue = new MessageQueue(@".\private$\myquelocal");

        while (ProcessStatus)
        {
            try
            {
                // Waits 100 millisec for a message to appear in queue
                System.Messaging.Message msg = myQueue.Receive(new TimeSpan(0, 0, 0, 0, 100));

                // Take care of message and insert data into database

            }
            catch (MessageQueueException)
            {
                // Ignore the timeout exception and continue processing the queue

            }
        }
    }
淡写薰衣草的香 2024-10-28 23:45:23

这是在控制台上运行并异步读取队列的类的示例。这是最安全、最快的方法。但请注意,根据您运行此程序的位置,如果您正在执行诸如使用消息正文更新文本框或类似操作之类的操作,则仍然需要某种锁定机制。

public sealed class ConsoleSurrogate {

    MessageQueue _mq = null;

    public override void Main(string[] args) {

        _mq = new MessageQueue(@".\private$\my_queue", QueueAccessMode.Receive);
        _mq.ReceiveCompleted += new ReceiveCompletedEventHandler(_mq_ReceiveCompleted);
        _mq.Formatter = new ActiveXMessageFormatter();
        MessagePropertyFilter filter = new MessagePropertyFilter();
        filter.Label = true;
        filter.Body = true;
        filter.AppSpecific = true;
        _mq.MessageReadPropertyFilter = filter;
        this.DoReceive();

        Console.ReadLine();
        _mq.Close();
    }

    void _mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e) {
        _mq.EndReceive(e.AsyncResult);
        Console.WriteLine(e.Message.Body);
        this.DoReceive();
    }

    private void DoReceive() {
        _mq.BeginReceive();
    }
}

This is an example of a class that runs on the console and reads a queue asynchronously. This is the safest and fastest way to do this. However note that depending on where you're running this, you'll still need to have some kind of locking mechanism if you're doing things like updating text boxes with the message body or something like that.

public sealed class ConsoleSurrogate {

    MessageQueue _mq = null;

    public override void Main(string[] args) {

        _mq = new MessageQueue(@".\private$\my_queue", QueueAccessMode.Receive);
        _mq.ReceiveCompleted += new ReceiveCompletedEventHandler(_mq_ReceiveCompleted);
        _mq.Formatter = new ActiveXMessageFormatter();
        MessagePropertyFilter filter = new MessagePropertyFilter();
        filter.Label = true;
        filter.Body = true;
        filter.AppSpecific = true;
        _mq.MessageReadPropertyFilter = filter;
        this.DoReceive();

        Console.ReadLine();
        _mq.Close();
    }

    void _mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e) {
        _mq.EndReceive(e.AsyncResult);
        Console.WriteLine(e.Message.Body);
        this.DoReceive();
    }

    private void DoReceive() {
        _mq.BeginReceive();
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文