自定义“收到信号”活动不起作用

发布于 01-14 02:35 字数 4721 浏览 6 评论 0 原文

我想实现一个自定义活动,其行为类似于Signal Received - 意味着它“暂停”工作流程,当我从客户端(邮递员)调用它时恢复并继续。 我不想使用现有的Signal Received,因为我想在同一操作中挂起工作流程之前有一些额外的逻辑。 我遵循了此处的指南。

因此,我基于 SignalReceived 默认活动。

using System;
using Elsa.Activities.Signaling.Models;
using Elsa.ActivityResults;
using Elsa.Attributes;
using Elsa.Expressions;
using Elsa.Services;
using Elsa.Services.Models;

namespace Elsa.Workflows.CustomActivities.Signals
{
    [Trigger(
        Category = "Custom",
        DisplayName = "Signal Custom",
        Description = "Custom - Suspend workflow execution until the specified signal is received.",
        Outcomes = new[] { OutcomeNames.Done }
    )]
    public class SignalCustom : Activity
    {
        [ActivityInput(Hint = "The name of the signal to wait for.", SupportedSyntaxes = new[] { SyntaxNames.JavaScript, SyntaxNames.Liquid })]
        public string Signal { get; set; } = default!;

        [ActivityOutput(Hint = "The input that was received with the signal.")]
        public object SignalInput { get; set; }

        [ActivityOutput] public object Output { get; set; }

        protected override bool OnCanExecute(ActivityExecutionContext context)
        {
            if (context.Input is Signal triggeredSignal)
                return string.Equals(triggeredSignal.SignalName, Signal, StringComparison.OrdinalIgnoreCase);

            return false;
        }

        protected override IActivityExecutionResult OnExecute(ActivityExecutionContext context) => context.WorkflowExecutionContext.IsFirstPass ? OnResume(context) : Suspend();

        protected override IActivityExecutionResult OnResume(ActivityExecutionContext context)
        {
            var triggeredSignal = context.GetInput<Signal>()!;
            SignalInput = triggeredSignal.Input;
            Output = triggeredSignal.Input;
            context.LogOutputProperty(this, nameof(Output), Output);
            return Done();
        }
    }
}

我还创建了SignalCustomBookmark.cs

using Elsa.Services;

namespace Elsa.Workflows.CustomActivities.Signals.Bookmark
{
    public class SignalCustomBookmark : IBookmark
    {
        public string Signal { get; set; } = default!;
    }
}

SignalCustomBookmarkProvider.cs

using Elsa.Services;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace Elsa.Workflows.CustomActivities.Signals.Bookmark
{
    public class SignalCustomBookmarkProvider : BookmarkProvider<SignalCustomBookmark, SignalCustom>
    {
        public override async ValueTask<IEnumerable<BookmarkResult>> GetBookmarksAsync(BookmarkProviderContext<SignalCustom> context, CancellationToken cancellationToken) => await GetBookmarksInternalAsync(context, cancellationToken).ToListAsync(cancellationToken);

        private async IAsyncEnumerable<BookmarkResult> GetBookmarksInternalAsync(BookmarkProviderContext<SignalCustom> context, [EnumeratorCancellation] CancellationToken cancellationToken)
        {
            var signalName = (await context.ReadActivityPropertyAsync(x => x.Signal, cancellationToken))?.ToLowerInvariant().Trim();

            // Can't do anything with an empty signal name.
            if (string.IsNullOrEmpty(signalName))
                yield break;

            yield return Result(new SignalCustomBookmark
            {
                Signal = signalName
            });
        }
    }
}

我还在ConfigureServices(IServiceCollection services)中注册了新书签:

services.AddBookmarkProvider<SignalCustomBookmarkProvider>();

我创建了测试工作流程并添加此自定义Signal Custom,并将test-signal作为信号。

我可以使用 API 启动工作流程,并且工作正常 - 一旦实例到达该 Signal Custom,工作流程就会暂停。 我使用来自 Postman 的此调用启动工作流程:

https://localhost:5001/v1/workflows/{workflow_id}/execute

但随后我想通过用

https://localhost:5001/v1/signals/test-signal/execute

此正文

{
    "WorkflowInstanceId": "{Workflow_Instance_Id}"
}

触发此调用来恢复它Postman 用此正文返回 200Ok

{
    "$id": "1",
    "startedWorkflows": []
}

您能否指导我如何从客户端恢复工作流程?

I want to implement a custom activity which behaves like Signal Received – means it “suspends” the workflow and when I called it from a client-side (Postman) to resume and continue.
I don’t want to use the existing Signal Received since I want to have some additional logic before suspending the workflow in the same action.
I followed the guidelines from here.

So I implemented Signal Custom activity based on SignalReceived default activity.

using System;
using Elsa.Activities.Signaling.Models;
using Elsa.ActivityResults;
using Elsa.Attributes;
using Elsa.Expressions;
using Elsa.Services;
using Elsa.Services.Models;

namespace Elsa.Workflows.CustomActivities.Signals
{
    [Trigger(
        Category = "Custom",
        DisplayName = "Signal Custom",
        Description = "Custom - Suspend workflow execution until the specified signal is received.",
        Outcomes = new[] { OutcomeNames.Done }
    )]
    public class SignalCustom : Activity
    {
        [ActivityInput(Hint = "The name of the signal to wait for.", SupportedSyntaxes = new[] { SyntaxNames.JavaScript, SyntaxNames.Liquid })]
        public string Signal { get; set; } = default!;

        [ActivityOutput(Hint = "The input that was received with the signal.")]
        public object SignalInput { get; set; }

        [ActivityOutput] public object Output { get; set; }

        protected override bool OnCanExecute(ActivityExecutionContext context)
        {
            if (context.Input is Signal triggeredSignal)
                return string.Equals(triggeredSignal.SignalName, Signal, StringComparison.OrdinalIgnoreCase);

            return false;
        }

        protected override IActivityExecutionResult OnExecute(ActivityExecutionContext context) => context.WorkflowExecutionContext.IsFirstPass ? OnResume(context) : Suspend();

        protected override IActivityExecutionResult OnResume(ActivityExecutionContext context)
        {
            var triggeredSignal = context.GetInput<Signal>()!;
            SignalInput = triggeredSignal.Input;
            Output = triggeredSignal.Input;
            context.LogOutputProperty(this, nameof(Output), Output);
            return Done();
        }
    }
}

I created theSignalCustomBookmark.cs:

using Elsa.Services;

namespace Elsa.Workflows.CustomActivities.Signals.Bookmark
{
    public class SignalCustomBookmark : IBookmark
    {
        public string Signal { get; set; } = default!;
    }
}

and SignalCustomBookmarkProvider.cs as well:

using Elsa.Services;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace Elsa.Workflows.CustomActivities.Signals.Bookmark
{
    public class SignalCustomBookmarkProvider : BookmarkProvider<SignalCustomBookmark, SignalCustom>
    {
        public override async ValueTask<IEnumerable<BookmarkResult>> GetBookmarksAsync(BookmarkProviderContext<SignalCustom> context, CancellationToken cancellationToken) => await GetBookmarksInternalAsync(context, cancellationToken).ToListAsync(cancellationToken);

        private async IAsyncEnumerable<BookmarkResult> GetBookmarksInternalAsync(BookmarkProviderContext<SignalCustom> context, [EnumeratorCancellation] CancellationToken cancellationToken)
        {
            var signalName = (await context.ReadActivityPropertyAsync(x => x.Signal, cancellationToken))?.ToLowerInvariant().Trim();

            // Can't do anything with an empty signal name.
            if (string.IsNullOrEmpty(signalName))
                yield break;

            yield return Result(new SignalCustomBookmark
            {
                Signal = signalName
            });
        }
    }
}

I also registered the new bookmark in ConfigureServices(IServiceCollection services):

services.AddBookmarkProvider<SignalCustomBookmarkProvider>();

I created a test workflow and add this custom Signal Custom with test-signal as signal.

I can start the workflow using the api and it's working fine - once the instance gets to that Signal Custom the workflow becames suspended.
I start the workflow using this call from Postman:

https://localhost:5001/v1/workflows/{workflow_id}/execute

But then I want to resume it by triggering this call

https://localhost:5001/v1/signals/test-signal/execute

with this body

{
    "WorkflowInstanceId": "{Workflow_Instance_Id}"
}

Postman returns 200Ok with this body

{
    "$id": "1",
    "startedWorkflows": []
}

Could you please guide me how to resume the workflow from client-side?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

梦忆晨望 2025-01-21 02:35:40

https://localhost:5001/v1/signals/test-signal/execute 端点对您不起作用,因为它在内部使用 ISignaler

await _signaler.TriggerSignalAsync(signalName, request.Input, request.WorkflowInstanceId, request.CorrelationId)

默认实现ISignaler 依次执行以下操作:

var normalizedSignal = signal.ToLowerInvariant();

return await _workflowLaunchpad.CollectAndExecuteWorkflowsAsync(new WorkflowsQuery(
    nameof(SignalReceived),
    new SignalReceivedBookmark { Signal = normalizedSignal },
    correlationId,
    workflowInstanceId,
    default,
    TenantId
), new WorkflowInput(new Signal(normalizedSignal, input)), cancellationToken);

请注意,上面的代码片段正在使用 SignalReceived 活动和 SignalReceivedBookmark 书签构建工作流查询。

这些查询参数与您触发在自定义 SignalCustom 活动和 SignalCustomBookmark 书签类型上启动或阻止的工作流程所需的查询参数不同。

换句话说,您还需要执行两个步骤才能使其正常工作:

  1. 实现自定义控制器(假设您想从控制器触发自定义信号;)
  2. 自己调用 IWorkflowLaunchpad.CollectAndExecuteWorkflowsAsync,而不是依赖 ISignaler

更好的是:定义一个名为 ICustomSignaler 的新服务并让其实现来完成工作。

例如:

public interface ICustomSignaler
{
   /// <summary>
   /// Runs all workflows that start with or are blocked on the <see cref="SignalCustom"/> activity.
   /// </summary>
   Task<IEnumerable<CollectedWorkflow>> TriggerSignalAsync(string signal, object? input = null, string? workflowInstanceId = null, string? correlationId = null, CancellationToken cancellationToken = default);
}

public class CustomSignaler : ICustomSignaler
{
   public async Task<IEnumerable<CollectedWorkflow>> TriggerSignalAsync(string signal, object? input = default, string? workflowInstanceId = default, string? correlationId = default, CancellationToken cancellationToken = default)
{
    var normalizedSignal = signal.ToLowerInvariant();

    return await _workflowLaunchpad.CollectAndExecuteWorkflowsAsync(new WorkflowsQuery(
        nameof(SignalCustom ),
        new SignalCustomBookmark{ Signal = normalizedSignal },
        correlationId,
        workflowInstanceId,
        default,
        TenantId
    ), new WorkflowInput(new Signal(normalizedSignal, input)), cancellationToken);

}

您的自定义控制器可能如下所示:

[ApiController]
[Route("custom-signals/{signalName}/execute")]
[Produces("application/json")]
public class Execute : Controller
{
    private readonly ICustomSignaler _signaler;
    private readonly IEndpointContentSerializerSettingsProvider _serializerSettingsProvider;

    public Execute(ICustomSignaler signaler, IEndpointContentSerializerSettingsProvider serializerSettingsProvider)
    {
        _signaler = signaler;
        _serializerSettingsProvider = serializerSettingsProvider;
    }

    [HttpPost]
    public async Task<IActionResult> Handle(string signalName, ExecuteSignalRequest request, CancellationToken cancellationToken = default)
    {
        var result = await _signaler.TriggerSignalAsync(signalName, request.Input, request.WorkflowInstanceId, request.CorrelationId, cancellationToken).ToList();

        if (Response.HasStarted)
            return new EmptyResult();

        return Json(
            new ExecuteSignalResponse(result.Select(x => new CollectedWorkflow(x.WorkflowInstanceId, x.ActivityId)).ToList()),
            _serializerSettingsProvider.GetSettings());
    }
}

我们正在跟踪两个问题,看看是否可以提供一种方法来允许用户扩展 SignalReceived 活动,而无需太多自定义代码:

  • < a href="https://github.com/elsa-workflows/elsa-core/issues/1035" rel="nofollow noreferrer">https://github.com/elsa-workflows/elsa-core/issues/1035
  • "="">https://github.com/elsa-workflows/elsa-core/issues/1053

The https://localhost:5001/v1/signals/test-signal/execute endpoint won't work for you because internally, it uses ISignaler:

await _signaler.TriggerSignalAsync(signalName, request.Input, request.WorkflowInstanceId, request.CorrelationId)

The default implementation of ISignaler, in turn, does this:

var normalizedSignal = signal.ToLowerInvariant();

return await _workflowLaunchpad.CollectAndExecuteWorkflowsAsync(new WorkflowsQuery(
    nameof(SignalReceived),
    new SignalReceivedBookmark { Signal = normalizedSignal },
    correlationId,
    workflowInstanceId,
    default,
    TenantId
), new WorkflowInput(new Signal(normalizedSignal, input)), cancellationToken);

Notice that the above code snippet is building a workflow query using the SignalReceived activity and the SignalReceivedBookmark bookmark.

These are not the same query parameters that you need in order to trigger workflows starting with or blocked on your custom SignalCustom activity and SignalCustomBookmark bookmark type.

In other words, you need to take two more steps to make it work:

  1. Implement a custom controller (assuming you want to trigger your custom signal from a controller;)
  2. Invoke IWorkflowLaunchpad.CollectAndExecuteWorkflowsAsync yourself instead of relying on ISignaler.

Better yet: define a new service called ICustomSignaler and have its implementation do the work.

For example:

public interface ICustomSignaler
{
   /// <summary>
   /// Runs all workflows that start with or are blocked on the <see cref="SignalCustom"/> activity.
   /// </summary>
   Task<IEnumerable<CollectedWorkflow>> TriggerSignalAsync(string signal, object? input = null, string? workflowInstanceId = null, string? correlationId = null, CancellationToken cancellationToken = default);
}

public class CustomSignaler : ICustomSignaler
{
   public async Task<IEnumerable<CollectedWorkflow>> TriggerSignalAsync(string signal, object? input = default, string? workflowInstanceId = default, string? correlationId = default, CancellationToken cancellationToken = default)
{
    var normalizedSignal = signal.ToLowerInvariant();

    return await _workflowLaunchpad.CollectAndExecuteWorkflowsAsync(new WorkflowsQuery(
        nameof(SignalCustom ),
        new SignalCustomBookmark{ Signal = normalizedSignal },
        correlationId,
        workflowInstanceId,
        default,
        TenantId
    ), new WorkflowInput(new Signal(normalizedSignal, input)), cancellationToken);

}

Your custom controller could then look something like this:

[ApiController]
[Route("custom-signals/{signalName}/execute")]
[Produces("application/json")]
public class Execute : Controller
{
    private readonly ICustomSignaler _signaler;
    private readonly IEndpointContentSerializerSettingsProvider _serializerSettingsProvider;

    public Execute(ICustomSignaler signaler, IEndpointContentSerializerSettingsProvider serializerSettingsProvider)
    {
        _signaler = signaler;
        _serializerSettingsProvider = serializerSettingsProvider;
    }

    [HttpPost]
    public async Task<IActionResult> Handle(string signalName, ExecuteSignalRequest request, CancellationToken cancellationToken = default)
    {
        var result = await _signaler.TriggerSignalAsync(signalName, request.Input, request.WorkflowInstanceId, request.CorrelationId, cancellationToken).ToList();

        if (Response.HasStarted)
            return new EmptyResult();

        return Json(
            new ExecuteSignalResponse(result.Select(x => new CollectedWorkflow(x.WorkflowInstanceId, x.ActivityId)).ToList()),
            _serializerSettingsProvider.GetSettings());
    }
}

We're tracking two issues to see if we can provide a way to allow users to extend the SignalReceived activity without having too much custom code:

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文