异步调用事件并捕获异常,而无需阻止其他任务运行

发布于 2025-02-02 00:30:06 字数 4103 浏览 3 评论 0原文

给定以下代码:

public delegate Task AsyncEventHandler<in TEventArgs>(object sender, TEventArgs eventArgs);

public static Task InvokeAsync(this AsyncEventHandler eventHandler, object sender, EventArgs eventArgs)
{
    if (eventHandler == null) return Task.CompletedTask;

    var delegates = eventHandler.GetInvocationList().Cast<AsyncEventHandler>();
    var tasks = delegates.Select(it => it.Invoke(sender, eventArgs));

    return Task.WhenAll(tasks);
}

我有一个测试功能,其中有缺陷的处理程序应抛出异常,并且WorkingHanlder应该运行 - 目前只有第一个FARKYHANDLER1被调用,而没有其他事件处理程序。

private class NonGenericNotifier
{
    public event AsyncEventHandler SomethingHappened;
    public Task OnSomethingHappening() => SomethingHappened.InvokeAsync(this, EventArgs.Empty);
}

public async Task Multiple_Exceptions_That_Occur_During_Event_Handling_Should_Be_Propagated()
{
    var isHandler1Called = false;
    var isHandler2Called = false;
    var isWorkingHandlerCalled = false;

    var notifier = new NonGenericNotifier();

    Task FaultyHandler1(object sender, EventArgs eventArgs)
    {
        isHandler1Called = true;
        throw new InvalidOperationException();
    }

    Task FaultyHandler2(object sender, EventArgs eventArgs)
    {
        isHandler2Called = true;
        throw new InvalidOperationException();
    }

    Task WorkingHandler(object sender, EventArgs eventArgs)
    {
        isWorkingHandlerCalled = true;
        return Task.CompletedTask;
    }

    notifier.SomethingHappened += FaultyHandler1;
    notifier.SomethingHappened += FaultyHandler2;
    notifier.SomethingHappened += WorkingHandler;

    await Should.ThrowAsync<InvalidOperationException>(async () => await notifier.OnSomethingHappening());
    isHandler1Called.ShouldBe(true);
    isHandler2Called.ShouldBe(true);
    isWorkingHandlerCalled.ShouldBe(true);
}

假设可以提出一个例外,我相信这应该是gentregateException包含每个任务的例外,最重要的是,上面的InvokeAsync方法应在第一个例外情况下保释。

我已经开始在InvoKeAsync扩展方法中创建lt; exception&gt;,并包装每个it =&gt; IT.in.invoke(发件人,EventArgs)带有try/Catch构造,并且在CANGE中将异常添加到异常列表中。

但是,我对如何整理此异常列表感到迷失,然后以gencregateException发送。

更新(修复?)

感谢Artur将我指向正确的方向。我将InvokeAsync扩展方法更改为下面的扩展方法,它 works - 不再停止第一个任务。我们从var tasks = delegates.Select(it =&gt; it.invoke(sender,eventArgs)); 使用代码在这里

public static Task InvokeAsync(this AsyncEventHandler eventHandler, object sender, EventArgs eventArgs)
{
    if (eventHandler == null) return Task.CompletedTask;

    var delegates = eventHandler.GetInvocationList().Cast<AsyncEventHandler>();
    var tasks = delegates.Select(async it => await it.Invoke(sender, eventArgs));

    return Task.WhenAll(tasks).WithAggregatedExceptions();
}

static Task WithAggregatedExceptions(this Task task)
{
    // using AggregateException.Flatten as a bonus
    return task.ContinueWith(
        continuationFunction: priorTask =>
            priorTask.IsFaulted &&
            priorTask.Exception is AggregateException ex && (ex.InnerExceptions.Count > 1 || ex.InnerException is AggregateException) ? Task.FromException(ex.Flatten()) : priorTask,
        cancellationToken: CancellationToken.None,
        TaskContinuationOptions.ExecuteSynchronously,
        scheduler: TaskScheduler.Default).Unwrap();
}

我的问题是此事件的订阅者,编写我无法控制的同步处理程序 -停止连接连接到同一事件的另一个事件处理程序(同步和异步)。

我也感谢这是task.thenall的设计函数,如果您要混合异步和非Async处理程序...如果有一个理由不带有没有的异步函数中的同步代码等待Task.yield()就是这样。

问题

我们可以说包裹demeg.select(async it =&gt;等待it.invoke(sender,evensargs)用异步/等待允许同步方法运行,并且在最差的(?)中,将异步方法(与嵌套异步/等待函数呼叫相同),所以实际上是一个非问题

?赏金寻求有关如何实施的权威指导,一个答案(为讨论做出贡献非常感谢)说避免异步事件,但在其他地方,例如Discord c#客户端,他们接受了异步事件(带有超时包装器等)。

Given the following code:

public delegate Task AsyncEventHandler<in TEventArgs>(object sender, TEventArgs eventArgs);

public static Task InvokeAsync(this AsyncEventHandler eventHandler, object sender, EventArgs eventArgs)
{
    if (eventHandler == null) return Task.CompletedTask;

    var delegates = eventHandler.GetInvocationList().Cast<AsyncEventHandler>();
    var tasks = delegates.Select(it => it.Invoke(sender, eventArgs));

    return Task.WhenAll(tasks);
}

I have a test function whereby the faulty handlers should throw exceptions, and the workinghanlder should run - currently only the first FaultyHandler1 is called and no others event handlers.

private class NonGenericNotifier
{
    public event AsyncEventHandler SomethingHappened;
    public Task OnSomethingHappening() => SomethingHappened.InvokeAsync(this, EventArgs.Empty);
}

public async Task Multiple_Exceptions_That_Occur_During_Event_Handling_Should_Be_Propagated()
{
    var isHandler1Called = false;
    var isHandler2Called = false;
    var isWorkingHandlerCalled = false;

    var notifier = new NonGenericNotifier();

    Task FaultyHandler1(object sender, EventArgs eventArgs)
    {
        isHandler1Called = true;
        throw new InvalidOperationException();
    }

    Task FaultyHandler2(object sender, EventArgs eventArgs)
    {
        isHandler2Called = true;
        throw new InvalidOperationException();
    }

    Task WorkingHandler(object sender, EventArgs eventArgs)
    {
        isWorkingHandlerCalled = true;
        return Task.CompletedTask;
    }

    notifier.SomethingHappened += FaultyHandler1;
    notifier.SomethingHappened += FaultyHandler2;
    notifier.SomethingHappened += WorkingHandler;

    await Should.ThrowAsync<InvalidOperationException>(async () => await notifier.OnSomethingHappening());
    isHandler1Called.ShouldBe(true);
    isHandler2Called.ShouldBe(true);
    isWorkingHandlerCalled.ShouldBe(true);
}

Assuming a single exception can be thrown I beleive this should be an AggregateException containing an exception for each Task, and most importantly the InvokeAsync method above should bail on the first exception encountered.

I have started to create a List<Exception> within the InvokeAsync extension method, and wrap each it => it.Invoke(sender, eventArgs) with a try/catch construct and within the catch add the exception to the exception list.

However I am lost on how to collate this list of exceptions and then send on as an AggregateException.

UPDATE (FIX?)

Thanks to Artur for pointing me in the right direction. I changed the InvokeAsync extension method to the below, and it works - no longer halting on the first task. We have gone from var tasks = delegates.Select(it => it.Invoke(sender, eventArgs)); to the below using the code here:

public static Task InvokeAsync(this AsyncEventHandler eventHandler, object sender, EventArgs eventArgs)
{
    if (eventHandler == null) return Task.CompletedTask;

    var delegates = eventHandler.GetInvocationList().Cast<AsyncEventHandler>();
    var tasks = delegates.Select(async it => await it.Invoke(sender, eventArgs));

    return Task.WhenAll(tasks).WithAggregatedExceptions();
}

static Task WithAggregatedExceptions(this Task task)
{
    // using AggregateException.Flatten as a bonus
    return task.ContinueWith(
        continuationFunction: priorTask =>
            priorTask.IsFaulted &&
            priorTask.Exception is AggregateException ex && (ex.InnerExceptions.Count > 1 || ex.InnerException is AggregateException) ? Task.FromException(ex.Flatten()) : priorTask,
        cancellationToken: CancellationToken.None,
        TaskContinuationOptions.ExecuteSynchronously,
        scheduler: TaskScheduler.Default).Unwrap();
}

My issue is subscribers of this event, writing synchronous handlers over which I have no control - this would stop the other event handlers (sync and async) running that are attached to the same event.

I also appreciate this is the designed function of Task.WhenAll if you were mixing async and non-async handlers... if there is one reason to not write synchronous code in an async function without await Task.Yield() this is it.

Question

Can we say that wrapping the delegates.Select(async it => await it.Invoke(sender, eventArgs) with async/await allows synchronous method to run, and at worst(?) wrap twice an async method (which is the same as nesting async/await function calls) so is actually a non-issue?

Are there any side effects that have been introduced?

With the bounty looking for authorative guidance on how this would be implemented, one answer (much appreciated for contributing to the discussion) says to avoid async events, yet in other places like the discord c# client they have embraced async events (with timeout wrappers etc).

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

铃予 2025-02-09 00:30:06

task.whenall将在重新简介其参数时调用所有处理程序。它将一次调用一个,然后异步等待所有任务完成。

您看到第一个例外停止的原因是因为在重新化过程中抛出了例外。异步(返回任务)功能是正常的,可以在返回的任务上放置任何异常。异步功能直接抛出异常是异常的。

因此,这是有问题的代码:

Task FaultyHandler1(object sender, EventArgs eventArgs)
{
  isHandler1Called = true;
  throw new InvalidOperationException();
}

其中之一是正确的:

async Task FaultyHandler1(object sender, EventArgs eventArgs)
{
  isHandler1Called = true;
  throw new InvalidOperationException();
}

Task FaultyHandler1(object sender, EventArgs eventArgs)
{
  isHandler1Called = true;
  return Task.FromException(new InvalidOperationException());
}

您正在看到奇怪的行为,因为异步处理程序的行为不端(通过抛出同步异常)。

现在,如果您想要允许异步的处理程序,那么您 can can 都可以使用显式尝试/catch >或额外的async/等待

var tasks = delegates.Select(it => try { return it.Invoke(sender, eventArgs); } catch (Exception ex) { return Task.FromException(ex); });

// async/await is necessary to handle misbehaving asynchronous handlers that throw synchronous exceptions
var tasks = delegates.Select(async it => await it.Invoke(sender, eventArgs));

如果您保留async/等待等待方法,请进行评论,因为这样的编码结构通常被认为是虚假的,并且可以被未来的维护者删除。

withAggregatedExceptions看起来不错,但是如果需要,可以简化它:

static async Task WithAggregatedExceptions(this Task task)
{
  try { await task; }
  catch { throw task.Exception; }
}

我的问题是此事件的订户,编写我无法控制的同步处理程序 - 这将阻止运行在同一事件上的其他事件处理程序(Sync和async)。

好吧,是的。 task.whenall同步重新安排其任务收集,该任务一次调用所有处理程序。

如果要同时允许同步处理程序以及异步处理程序,则您 can can 将调用包装在task.run中:

var tasks = delegates.Select(it => Task.Run(() => it.Invoke(sender, eventArgs)));

我们可以说包装demelect.select(async it =&gt;等待它。与嵌套异步/等待函数调用相同),所以实际上是一个非问题吗?

的 async /等待异步处理程序的 出现 ,我说它有被删除的危险(除非评论)“允许同步方法运行”;而不是按预期将它们放在返回的任务上。

是否有任何副作用?

并不真地。如果您确实使用task.run方法,则将所有处理程序都在线程池线程上调用并可能同时运行,这可能令人惊讶。

一个答案(非常感谢为讨论做出贡献)说避免异步事件,但是在其他地方,他们接受了异步事件(带有超时包装器等)等其他地方。

我100%同意该答案。

我的思考是:

观察者模式是将观察者通知状态变化的一种方式。观察者模式明显适合OOP中的“事件”:任何数量的观察者都可以订阅状态变更通知。这就是C#事件的图案:通知订阅者的事物。没有信息可以“回流”的机制。尽管语言允许 c#具有返回值的事件,但无论如何,它不是天然。异常情况也会发生相同的限制(可以认为是一种返回):标准处理程序?invoke模式开始断开(在第一个例外情况下停止调用等)。

一旦您有信息“向后流”(包括需要处理异常或需要等待所有处理程序要完成),您将不再处于观察者模式中,并且您不再是在C#活动的快乐之路。

通常,我发现这种类型的大多数“事件”通常与策略或访客模式有关,而不是观察者。策略和访客都不适合C#事件,尽管经常(可悲的是)以这种方式实施。我认为这是一个常见的设计错误(对于所有OOP语言)。

Task.WhenAll will invoke all the handlers when it reifies its parameter. It will invoke them one at a time, and then will asynchronously wait for all the tasks to complete.

The reason you were seeing the halting on the first exception was because the exception was thrown during reification. It's normal for asynchronous (Task-returning) functions to place any exceptions on the returned task. It's abnormal for asynchronous functions to throw exceptions directly.

So, this is the problematic code:

Task FaultyHandler1(object sender, EventArgs eventArgs)
{
  isHandler1Called = true;
  throw new InvalidOperationException();
}

One of these would be correct:

async Task FaultyHandler1(object sender, EventArgs eventArgs)
{
  isHandler1Called = true;
  throw new InvalidOperationException();
}

Task FaultyHandler1(object sender, EventArgs eventArgs)
{
  isHandler1Called = true;
  return Task.FromException(new InvalidOperationException());
}

You're seeing the odd behavior because the asynchronous handler is misbehaving (by throwing a synchronous exception).

Now, if you want to allow misbehaving asynchronous handlers, you can do that, either with an explicit try/catch or the extra async/await:

var tasks = delegates.Select(it => try { return it.Invoke(sender, eventArgs); } catch (Exception ex) { return Task.FromException(ex); });

// async/await is necessary to handle misbehaving asynchronous handlers that throw synchronous exceptions
var tasks = delegates.Select(async it => await it.Invoke(sender, eventArgs));

If you do keep the async/await approach, please do comment it, because coding constructs like that are often assumed to be spurious and may be removed by a future maintainer.

The WithAggregatedExceptions looks fine as-is, but it can be simplified if you want:

static async Task WithAggregatedExceptions(this Task task)
{
  try { await task; }
  catch { throw task.Exception; }
}

My issue is subscribers of this event, writing synchronous handlers over which I have no control - this would stop the other event handlers (sync and async) running that are attached to the same event.

Well, yes. Task.WhenAll reifies its collection of tasks synchronously, which invokes all the handlers one at a time.

If you want to allow synchronous handlers as well as asynchronous ones all at the same time, you can wrap the invocations in a Task.Run:

var tasks = delegates.Select(it => Task.Run(() => it.Invoke(sender, eventArgs)));

Can we say that wrapping the delegates.Select(async it => await it.Invoke(sender, eventArgs) with async/await allows synchronous method to run, and at worst(?) wrap twice an async method (which is the same as nesting async/await function calls) so is actually a non-issue?

The extra async/await for asynchronous handlers is a non-issue; it's very slightly less efficient, and appears unnecessary, so I'd say it's in danger of being removed (unless commented). It doesn't "allow synchronous methods to run"; instead, it corrects the misbehaving methods that throw exceptions directly instead of placing them on the returned Task as expected.

Are there any side effects that have been introduced?

Not really. If you do use the Task.Run approach, then all the handlers are invoked on thread pool threads and may run concurrently, which may be surprising.

one answer (much appreciated for contributing to the discussion) says to avoid async events, yet in other places like the discord c# client they have embraced async events (with timeout wrappers etc).

I am 100% in agreement with that answer.

Here's how I think about it:

The Observer pattern is a way to notify observers of state changes. The Observer pattern is a clear fit for "events" in OOP: any number of observers may subscribe to state change notifications. This is what C# events were patterned after: notifying subscribers of things. There's no mechanism for information to "flow back". While the language allows C# events with return values, it's not natural by any means. The same limitation happens with exceptions (which can be considered a kind of return): the standard handler?.Invoke pattern starts to break (stopping invocations at the first exception, etc).

As soon as you have information "flowing back" (including needing to handle exceptions, or needing to await all the handlers to complete), you're no longer in the Observer pattern, and you are no longer in the happy path of C# events.

Generally, I find most "events" of this type are usually related to a Strategy or Visitor pattern, rather than Observer. Neither Strategy nor Visitor are good fits for C# events, although they are often (sadly) implemented that way. I consider that a common design mistake (for all OOP languages).

长伴 2025-02-09 00:30:06

我认为,以异步方式使用C#事件的设计并不强大,并且它总是以一种不受控制的方式行事。有更好的技术可以使事件处理强大。

最好的技术之一是TPL DataFlows( https://learn.microsoft.com/en-us/dotnet/standard/parallelal-programming/dataflow-task-parellell-library-library )。该库允许您以非常控制的方式进行编程流处理,它可以帮助您处理任务调度程序等。成功应用此过程后,问题中的所有问题都将被解决。

显然还有其他替代方案,但是我显然会使用C#事件来弃权。

In my opinion, the design of using C# events in an async way is not robust, and it will always behave in a slightly uncontrolled way. There are better techniques to make event processing robust.

One of the best such technologies is TPL Dataflows (https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library). This library allows you to program stream processing in a very controlled way, it helps you deal with task schedulers etc. Once you apply this successfully, all the problems in your question will be addressed.

There are obviously other alternatives out there, but I would clearly abstain from re-implementing this by using C# events....

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文