ServiceBustrigger:一次会话一次接收一条消息

发布于 2025-01-24 04:11:03 字数 3024 浏览 3 评论 0原文

我具有一个Azure功能,其中包括ServiceBustrigger触发器,

它配置为接收会话,当我插入消息时,它将使用正确的“ SessionId”插入。

但是,当执行触发器时,它一次仅执行一个会话消息。

有人可以帮我吗?我想一次运行所有会话消息。

我在下面留下有关插入消息和触发

serviceBustrigger的代码段:

public async Task RunAsync([ServiceBusTrigger("%" + AzureServiceBusConfiguration.ServiceBusBlobMigrationQueueNameSecretName + "%", Connection = AzureServiceBusConfiguration.ServiceBusConnectionSecretName, IsSessionsEnabled = true)] string messageBrokerMessageString, FunctionContext context)
{
//CODE
}

消息插入:

    for (int chunkIndex = 0; chunkIndex < chunks.Count(); chunkIndex++)
    {
        string workerNodeName = _workerManager.GetWorkerNodeName(controlMetadataEntity.GetBlobMigrationStrategyIdentifier(), Convert.ToUInt32(chunkIndex), controlMetadataEntity.RowKey);

        foreach (BlobMetadataEntity blob in chunks.ElementAt(chunkIndex))
        {
            ServiceBusMessage message = new ServiceBusMessage()
            {
                // The message ID is necessary to prevent duplicate entries in message broker.
                // See https://learn.microsoft.com/en-us/azure/service-bus-messaging/duplicate-detection.
                MessageId = string.Join('|', blob.PartitionKey, blob.RowKey),

                // Body will be the JSON of a object converted to bytes.
                Body = new BinaryData(Encoding.UTF8.GetBytes(new BlobExecutionQueueMessage(blob.PartitionKey, blob.RowKey, workerNodeName, null).GetObjectAsJsonAsync())),
                ContentType = MediaTypeNames.Application.Json,

                // This will separate the message in logic queues to control the parallelism per execution.
                // See https://learn.microsoft.com/en-us/azure/service-bus-messaging/message-sessions.
                SessionId = workerNodeName
            };

            // Set user properties to identify this blob in message broker.
            message.ApplicationProperties.Add("ExecutionId", blob.PartitionKey);
            message.ApplicationProperties.Add("BlobId", blob.RowKey);

            // Add the "AddAsync" task to the list of tasks to run them after all
            // messages were enqueued in the list.
            messageBrokerInsertions.Add(_azureServiceBusClient.SendMessageAsync(serviceBusClient, sender, message));
        }
    }

我的host.json

{
    "version": "2.0",
    "logging": {
        "applicationInsights": {
            "samplingExcludedTypes": "Request",
            "samplingSettings": {
                "isEnabled": true
            }
        },
        "logLevel": {
            "Default": "Information"
        }
    },
    "extensions": {
        "durableTask": {
            "maxConcurrentActivityFunctions": 500,
            "maxConcurrentOrchestratorFunctions": 500
        },
        "serviceBus": {
            "sessionHandlerOptions": {
                "maxConcurrentSessions": 1
            }
        }
    }
}

I have an Azure Function with the ServiceBusTrigger trigger

it's configured to receive sessions, and when I insert the messages, it's inserting with the correct "SessionId".

But when executing the trigger it executes only one session message at a time.

Can someone help me? I want to run all session messages at a time.

I leave below code snippets regarding the insertion of messages and trigger

ServiceBusTrigger:

public async Task RunAsync([ServiceBusTrigger("%" + AzureServiceBusConfiguration.ServiceBusBlobMigrationQueueNameSecretName + "%", Connection = AzureServiceBusConfiguration.ServiceBusConnectionSecretName, IsSessionsEnabled = true)] string messageBrokerMessageString, FunctionContext context)
{
//CODE
}

Message Insertion:

    for (int chunkIndex = 0; chunkIndex < chunks.Count(); chunkIndex++)
    {
        string workerNodeName = _workerManager.GetWorkerNodeName(controlMetadataEntity.GetBlobMigrationStrategyIdentifier(), Convert.ToUInt32(chunkIndex), controlMetadataEntity.RowKey);

        foreach (BlobMetadataEntity blob in chunks.ElementAt(chunkIndex))
        {
            ServiceBusMessage message = new ServiceBusMessage()
            {
                // The message ID is necessary to prevent duplicate entries in message broker.
                // See https://learn.microsoft.com/en-us/azure/service-bus-messaging/duplicate-detection.
                MessageId = string.Join('|', blob.PartitionKey, blob.RowKey),

                // Body will be the JSON of a object converted to bytes.
                Body = new BinaryData(Encoding.UTF8.GetBytes(new BlobExecutionQueueMessage(blob.PartitionKey, blob.RowKey, workerNodeName, null).GetObjectAsJsonAsync())),
                ContentType = MediaTypeNames.Application.Json,

                // This will separate the message in logic queues to control the parallelism per execution.
                // See https://learn.microsoft.com/en-us/azure/service-bus-messaging/message-sessions.
                SessionId = workerNodeName
            };

            // Set user properties to identify this blob in message broker.
            message.ApplicationProperties.Add("ExecutionId", blob.PartitionKey);
            message.ApplicationProperties.Add("BlobId", blob.RowKey);

            // Add the "AddAsync" task to the list of tasks to run them after all
            // messages were enqueued in the list.
            messageBrokerInsertions.Add(_azureServiceBusClient.SendMessageAsync(serviceBusClient, sender, message));
        }
    }

My Host.json

{
    "version": "2.0",
    "logging": {
        "applicationInsights": {
            "samplingExcludedTypes": "Request",
            "samplingSettings": {
                "isEnabled": true
            }
        },
        "logLevel": {
            "Default": "Information"
        }
    },
    "extensions": {
        "durableTask": {
            "maxConcurrentActivityFunctions": 500,
            "maxConcurrentOrchestratorFunctions": 500
        },
        "serviceBus": {
            "sessionHandlerOptions": {
                "maxConcurrentSessions": 1
            }
        }
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

世界和平 2025-01-31 04:11:03

据我所知,这有点棘手,这是设计尊重与消费者有关的会话的顺序。您可以增加&gt的数量;
“ maxConcurrentsess”:16

将并行运行它们。此外,如果您正在使用消费计划,则不必担心缩放,并且将排队的长度用于保理量表。如果您不在乎订购,则可以切换到启用非会话并分批带来消息。请在此处注意:
https://learn.microsoft.com/en-us/azure/azure/azure-functions/functions-bindings-bindings-service-service-service-bus?tabs=in-process%2cextencextensionv5%2cextiense pivots =编程语言 - powershell#hostjson-settings

"When you set the isSessionsEnabled property or attribute on the trigger to true, the sessionHandlerOptions is honored. When you set the isSessionsEnabled property or attribute on the trigger to false, the messageHandlerOptions is honored."

留下一些参考链接,以防万一要阅读:
//队列和主题
https> https> https:// learch。 microsoft.com/en-us/azure/service-bus-messaging/service-bus-queues-topics-subscriptions

// maxConcurrentsessions
us/dotnet/api/microsoft.servicebus.messaging.sessionhandleroptions.maxconcurrentsessions?view = azure-dotnet#microsoft-servicebus-servicebus-messive-sessionHandlerOptions-maxconcurrentsess

This is a bit tricky as far as I know this is by design to respect order for the sessions in place in relation to the consumers. You could increase the number of->
"maxConcurrentSessions": 16

Which will run them in parallel. Additionally, if you are on a consumption plan you don't have to worry about scaling and it takes queue length into factoring scale. If you don't care about ordering you could switch to non session enabled and bring in the messages in batches. Please note the line here:
https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus?tabs=in-process%2Cextensionv5%2Cextensionv3&pivots=programming-language-powershell#hostjson-settings

"When you set the isSessionsEnabled property or attribute on the trigger to true, the sessionHandlerOptions is honored. When you set the isSessionsEnabled property or attribute on the trigger to false, the messageHandlerOptions is honored."

Leaving some reference links just in case to read up:
//queues and topics
https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-queues-topics-subscriptions

//maxconcurrentsessions
https://learn.microsoft.com/en-us/dotnet/api/microsoft.servicebus.messaging.sessionhandleroptions.maxconcurrentsessions?view=azure-dotnet#microsoft-servicebus-messaging-sessionhandleroptions-maxconcurrentsessions

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文