从 Parallel.ForEach 循环内部调用异步方法时出现无效操作异常

发布于 2024-12-05 02:26:07 字数 3475 浏览 0 评论 0原文

我继承了一个 Windows 服务,可以处理队列中的大量电子邮件。听起来很简单,抓取队列,发送电子邮件,如果 SmtpClient.SendAsync 没有从回调中返回错误,则将数据库中的电子邮件标记为正在发送。我正在使用信号量在线程上等待多个可以调用 SMTP 客户端的异步发送方法。这是我获取状态的唯一方法,并且根据 Microsoft 文档,它必须先完成操作,然后才能使另一个调用异步。现在是有趣的部分。我决定使用 Parallel.ForEach 来让他像这样排队。该方法在Windows服务OnStart中被调用。请注意,我尝试在单独的线程上调用此方法并得到相同的结果。

我在想,要么是,由于我缺乏关于线程的知识,我错过了一些明显的东西,要么是有问题的东西。最有可能的是 A。

 private static void ProcessEmailQueue()
    {
        List<EmailQueue> emailQueue =
            _repository.Select<EmailQueue>().Where(x => x.EmailStatuses.EmailStatus == "Pending").ToList();
        Parallel.ForEach(emailQueue, message =>
                                         {
                                             _smtpMail.FromAddress = message.FromAddress;
                                             _smtpMail.ToAddress = message.ToAddress;
                                             _smtpMail.Subject = message.Subject;
                                             _smtpMail.SendAsHtml = message.IsHtml > 0;
                                             _smtpMail.MessageBody = message.MessageBody;
                                             _smtpMail.UserToken = message.EmailQueueID;
                                             bool sendStatus = _smtpMail.SendMessage();
                                                 // THIS BLOWS UP with InvalidOperation Exception
                                         });
    }

这是从循环内调用的 SMTP 方法。

public bool SendMessage()
    {
        mailSendSemaphore = new Semaphore(0, 10); // This is defined as  private static Semaphore mailSendSemaphore;
        try
        {
            var fromAddress = new MailAddress(FromAddress);
            var toAddress = new MailAddress(ToAddress);

            using (var mailMessage = new MailMessage(fromAddress, toAddress))
            {
                mailMessage.Subject = Subject;
                mailMessage.IsBodyHtml = SendAsHtml;
                mailMessage.Body = MessageBody;
                Envelope = mailMessage;
                smtp.SendCompleted += smtp_SendCompleted;
                smtp.SendAsync(mailMessage, UserToken);
                mailSendSemaphore.WaitOne();
                return _mailSent;
            }
        }
        catch (Exception exception)
        {
            _logger.Error(exception);

            return _mailSent;
        }
    }

CALLBACK For Smtp Send

 private void smtp_SendCompleted(object sender, AsyncCompletedEventArgs e)
    {
        if (e.Cancelled)
        {
        }
        if (e.Error != null)
        {
        }
        else
        {
            _mailSent = true;
        }
        mailSendSemaphore.Release(2);
    }

这是异常,由于某些奇怪的原因花了一些时间才得到它。

System.InvalidOperationException was unhandled by user code

消息=异步调用已在进行中。必须先完成或取消它,然后才能调用此方法。 来源=系统 堆栈跟踪: 在 System.Net.Mail.SmtpClient.SendAsync(MailMessage 消息,对象 userToken) 在 SmtpMail.cs 中的 DFW.Infrastruct.Communications.SmtpMail.SendMessage() 处:第 71 行 在 Service1.cs 中的 EmaiProcessorService.EmailQueueService.b_0(EmailQueue 消息):第 57 行 在 System.Threading.Tasks.Parallel.<>c_DisplayClass2d2.b__23(Int32 i) 在 System.Threading.Tasks.Parallel.<>c__DisplayClassf1.b__c() InnerException:

似乎我的 waitone 被 System.Threading.Tasks.Parallel 删除了

I have inherited a windows service that processes a large number of e-mails in a queue. Sounds simple, Grab queue, send e-mail, if SmtpClient.SendAsync does not return an error from the call back then flag the e-mail in the DB as being sent.. I am using a Semaphore to waitone on the thread so multiple calls can be made to the Async Send method of the SMTP Client. This is the only way I can get the status and per Microsoft docs it has to finish the operation before another call can be made async. So now for the fun part. I decided to use a Parallel.ForEach to get he queue like so. This method is called in the Windows Service OnStart. Please note I have tried calling this method on a separate Thread and get the same results.

I am thinking that either A, I am missing something obvious, due to my lack of knowledge on threading, or something is flat bugged. Most likely A.

 private static void ProcessEmailQueue()
    {
        List<EmailQueue> emailQueue =
            _repository.Select<EmailQueue>().Where(x => x.EmailStatuses.EmailStatus == "Pending").ToList();
        Parallel.ForEach(emailQueue, message =>
                                         {
                                             _smtpMail.FromAddress = message.FromAddress;
                                             _smtpMail.ToAddress = message.ToAddress;
                                             _smtpMail.Subject = message.Subject;
                                             _smtpMail.SendAsHtml = message.IsHtml > 0;
                                             _smtpMail.MessageBody = message.MessageBody;
                                             _smtpMail.UserToken = message.EmailQueueID;
                                             bool sendStatus = _smtpMail.SendMessage();
                                                 // THIS BLOWS UP with InvalidOperation Exception
                                         });
    }

Here is the SMTP Method being called from withing the loop.

public bool SendMessage()
    {
        mailSendSemaphore = new Semaphore(0, 10); // This is defined as  private static Semaphore mailSendSemaphore;
        try
        {
            var fromAddress = new MailAddress(FromAddress);
            var toAddress = new MailAddress(ToAddress);

            using (var mailMessage = new MailMessage(fromAddress, toAddress))
            {
                mailMessage.Subject = Subject;
                mailMessage.IsBodyHtml = SendAsHtml;
                mailMessage.Body = MessageBody;
                Envelope = mailMessage;
                smtp.SendCompleted += smtp_SendCompleted;
                smtp.SendAsync(mailMessage, UserToken);
                mailSendSemaphore.WaitOne();
                return _mailSent;
            }
        }
        catch (Exception exception)
        {
            _logger.Error(exception);

            return _mailSent;
        }
    }

CALLBACK For Smtp Send

 private void smtp_SendCompleted(object sender, AsyncCompletedEventArgs e)
    {
        if (e.Cancelled)
        {
        }
        if (e.Error != null)
        {
        }
        else
        {
            _mailSent = true;
        }
        mailSendSemaphore.Release(2);
    }

Here is the Exception, took a few to get it for some odd reason.

System.InvalidOperationException was unhandled by user code

Message=An asynchronous call is already in progress. It must be completed or canceled before you can call this method.
Source=System
StackTrace:
at System.Net.Mail.SmtpClient.SendAsync(MailMessage message, Object userToken)
at DFW.Infrastructure.Communications.SmtpMail.SendMessage() in SmtpMail.cs:line 71
at EmaiProcessorService.EmailQueueService.b_0(EmailQueue message) in Service1.cs:line 57
at System.Threading.Tasks.Parallel.<>c
_DisplayClass2d2.<ForEachWorker>b__23(Int32 i)
at System.Threading.Tasks.Parallel.<>c__DisplayClassf
1.b__c()
InnerException:

Seems my waitone is getting obliterated by System.Threading.Tasks.Parallel

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

z祗昰~ 2024-12-12 02:26:07

好的,现在我们已经得到了错误文本,看起来相当清楚:

Message=异步调用已在进行中。必须先完成或取消它,然后才能调用此方法。

这与文档一致:

两个简单的选项:

  • 创建一个固定数量的客户端和要发送的消息队列。让每个客户端每次完成时都从队列中获取一条消息,直到队列为空。 BlockingCollection对此很有用.

  • 创建一个新的 SmtpClient信息。这可能会导致您对 SMTP 服务器有效地发起 DOS 攻击,这并不理想。

老实说,当您只是等待消息发送时,并不清楚为什么要使用 SendAsync...

Okay, now that we've got the error text, it seems fairly clear:

Message=An asynchronous call is already in progress. It must be completed or canceled before you can call this method.

This concurs with the documentation:

Two simple options:

  • Create a fixed number of clients, and a queue of messages to send. Make each client take a message from the queue each time it finishes, until the queue is empty. BlockingCollection<T> is good for this.

  • Create a new SmtpClient per message. This could cause you to effectively launch a DOS attack on your SMTP server, which isn't ideal.

To be honest, it's not really clear why you're using SendAsync when you're then just waiting for the message to be sent anyway...

太傻旳人生 2024-12-12 02:26:07

我不清楚你为什么在这里使用信号量,但几乎可以肯定你使用不正确。您将为每次调用 SendMessage 创建一个新的信号量实例。另外,您将对其调用一次 WaitOne,然后调用 Release(2),因此最终您将获得比获取的更多的释放。这可能就是导致 InvalidOperationException 的原因。

并行处理电子邮件队列对您没有任何好处,因为您一次只能发送一封消息。尝试在 Parallel.Foreach 内部异步执行此操作只会导致不必要的复杂化。

您最好使用 ThreadPool.QueueUserWorkItem 之类的东西,并使用一个一次发送一条消息的简单循环。

List<EmailQueue> emailQueue =
    _repository.Select<EmailQueue>().Where(x => x.EmailStatuses.EmailStatus == "Pending").ToList();

ThreadPool.QueueUserWorkItem(ProcessEmailQueue, emailQueue);

void ProcessEmailQueue(object state)
{
    List<EmailQueue> emailQueue = (List<EmailQueue>)state;
    foreach (var message in EmailQueue)
    {
        // Format and send message here.
    }
}

或者,您可以使用 Task 执行相同的操作。关键是您只需要一个线程来顺序处理队列。由于您一次无法发送多条消息,因此 Parallel.ForEach 对您没有任何好处。

编辑:

如果您需要一次进行多次发送,您可以修改原始代码。首先,在类范围内初始化信号量:

private static Semaphore mailSendSemaphore = new Semaphore(10, 10);

然后,在 SendMessage 方法中:

bool SendMessage()
{
    // acquire semaphore. This will block until there's a slot available.
    mailSendSemaphore.WaitOne();
    try
    {
        // do all your processing here, including sending the message.
        // use Send rather than SendAsync
    }
    finally
    {
        mailSendSemaphore.Release();
    }
}

无需使用 SendAsync

I'm not clear on why you're using a Semaphore here, but you're almost certainly using it incorrectly. You're creating a new semaphore instance for each call to SendMessage. Also, you're calling WaitOne on it once, and then calling Release(2), so eventually you'll have more releases than acquires. That's probably what causes your InvalidOperationException.

It doesn't do you any good to parallelize processing of the email queue, since you can only send one message at a time. And trying to do it asynchronously inside of the Parallel.Foreach is just more needless complication.

You're better off using something like ThreadPool.QueueUserWorkItem, and having a simple loop that sends one message at a time.

List<EmailQueue> emailQueue =
    _repository.Select<EmailQueue>().Where(x => x.EmailStatuses.EmailStatus == "Pending").ToList();

ThreadPool.QueueUserWorkItem(ProcessEmailQueue, emailQueue);

void ProcessEmailQueue(object state)
{
    List<EmailQueue> emailQueue = (List<EmailQueue>)state;
    foreach (var message in EmailQueue)
    {
        // Format and send message here.
    }
}

Alternatively, you can do the same thing with a Task. The point is that you just need a single thread to process the queue sequentially. Since you can't send more than one message at a time, Parallel.ForEach doesn't do you any good.

EDIT:

If you need to do multiple sends at a time, you can probably modify your original code. First, initialize the semaphore at class scope:

private static Semaphore mailSendSemaphore = new Semaphore(10, 10);

Then, in your SendMessage method:

bool SendMessage()
{
    // acquire semaphore. This will block until there's a slot available.
    mailSendSemaphore.WaitOne();
    try
    {
        // do all your processing here, including sending the message.
        // use Send rather than SendAsync
    }
    finally
    {
        mailSendSemaphore.Release();
    }
}

There's no need to use SendAsync.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文