使用Azure功能取消带有重试的任务数组

发布于 2025-02-07 11:39:19 字数 2765 浏览 1 评论 0原文

我正在使用Azure功能V4。在编排(mycustomorChestration)中,我会进行粉丝,然后调用Activity trigtiged function(handlefunction)。粉丝出口通常包含约20个任务。 handlefunction用a retryoptions调用,并且可以投掷不同的异常。如果抛出任何例外,则重试应重试,除了mycustomerror。如果mycustomError丢弃所有任务,则应停止重试,无论他们可能会丢下什么例外。我似乎没有工作。恢复不断重试。

我找不到传递concellationToken的方法。而且,如果我在handle中设置一个变量,则将在重播过程中重置。

完整可再现的示例下面:

public class MyCustomError : Exception
{

}

public static class Function1
{
    [FunctionName("Function1")]
    public static async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
          [DurableClient] IDurableClient starter,
        ILogger log)
    {
        var result = await starter.StartNewAsync(nameof(MyCustomOrchestration));
        return new OkObjectResult(result);
    }

    [FunctionName(nameof(HandleIdFunction))]
    public static async Task HandleIdFunction([ActivityTrigger] IDurableActivityContext context)
    {
        var input = context.GetInput<int>();
        Console.WriteLine($"{nameof(HandleIdFunction)} - {input}.");
        await Task.Delay(1000);
        // Fake errors.
        if (input == 10)
            throw new MyCustomError();
        if (input % 2 == 0)
            throw new Exception();
    }

        [FunctionName(nameof(MyCustomOrchestration))]
    public static async Task MyCustomOrchestration([OrchestrationTrigger] IDurableOrchestrationContext context)
    {
        var parallelTasks = new List<Task<Guid>>();

        var retryOptions = new RetryOptions(TimeSpan.FromSeconds(5), 30)
        {
            BackoffCoefficient = 1.85,
            MaxRetryInterval = TimeSpan.FromSeconds(5),
            RetryTimeout = TimeSpan.FromMinutes(1),
            Handle = exception =>
            {
                Console.WriteLine("Retry handled called");
                return (exception.InnerException is not MyCustomError);
            }
        };

        int[] idBatch = Enumerable.Range(0, 20).ToArray();
        for (int i = 0; i < idBatch.Length; i++)
        {
            var id = idBatch[i];
            Task<Guid> task = context.CallActivityWithRetryAsync<Guid>(nameof(HandleIdFunction), retryOptions, id);
            parallelTasks.Add(task);
        }

        Guid[] result = null;

        try
        {
            result = await Task.WhenAll(parallelTasks);
        }
        catch (Exception ex) when (ex.InnerException is MyCustomError)
        {
            // Custom handling.
        }
        catch (Exception ex)
        {
            // Custom handling.
        }
    }
}

I'm using Azure Functions V4. In an orchestration (MyCustomOrchestration) I do a fan-out and call an ActivityTriggered-function (HandleIdFunction). The fan-out will usually contain around 20 tasks. The HandleIdFunction is called with a RetryOptions and can throw different exceptions. The retry should retry if any exception is thrown, except for MyCustomError. If MyCustomError is thrown all tasks should be aborted and stop retrying, no matter what exception they may throw. I don't seem to get it to work. The retries keep retrying.

I could not find a way to pass a CancellationToken. And if I set a variable in Handle it will be reset during replay.

Complete reproducable example below:

public class MyCustomError : Exception
{

}

public static class Function1
{
    [FunctionName("Function1")]
    public static async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
          [DurableClient] IDurableClient starter,
        ILogger log)
    {
        var result = await starter.StartNewAsync(nameof(MyCustomOrchestration));
        return new OkObjectResult(result);
    }

    [FunctionName(nameof(HandleIdFunction))]
    public static async Task HandleIdFunction([ActivityTrigger] IDurableActivityContext context)
    {
        var input = context.GetInput<int>();
        Console.WriteLine(
quot;{nameof(HandleIdFunction)} - {input}.");
        await Task.Delay(1000);
        // Fake errors.
        if (input == 10)
            throw new MyCustomError();
        if (input % 2 == 0)
            throw new Exception();
    }

        [FunctionName(nameof(MyCustomOrchestration))]
    public static async Task MyCustomOrchestration([OrchestrationTrigger] IDurableOrchestrationContext context)
    {
        var parallelTasks = new List<Task<Guid>>();

        var retryOptions = new RetryOptions(TimeSpan.FromSeconds(5), 30)
        {
            BackoffCoefficient = 1.85,
            MaxRetryInterval = TimeSpan.FromSeconds(5),
            RetryTimeout = TimeSpan.FromMinutes(1),
            Handle = exception =>
            {
                Console.WriteLine("Retry handled called");
                return (exception.InnerException is not MyCustomError);
            }
        };

        int[] idBatch = Enumerable.Range(0, 20).ToArray();
        for (int i = 0; i < idBatch.Length; i++)
        {
            var id = idBatch[i];
            Task<Guid> task = context.CallActivityWithRetryAsync<Guid>(nameof(HandleIdFunction), retryOptions, id);
            parallelTasks.Add(task);
        }

        Guid[] result = null;

        try
        {
            result = await Task.WhenAll(parallelTasks);
        }
        catch (Exception ex) when (ex.InnerException is MyCustomError)
        {
            // Custom handling.
        }
        catch (Exception ex)
        {
            // Custom handling.
        }
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文