SQL Server 服务代理

发布于 2024-11-16 08:24:44 字数 2507 浏览 2 评论 0原文

目前我们正在使用服务代理来来回发送消息,效果很好。但我们想使用 RELATED_CONVERSATION_GROUP 对这些消息进行分组。我们想使用我们自己的数据库持久化 uuid 作为数据库中的 RELATED_CONVERSATION_GROUP = @uuid,但即使每次我们收到队列时 conversion_group_id 都不同,我们也使用相同的 uuid。

你们知道我创建代理或接收调用的方式有什么问题吗?我在下面提供了代理创建代码和接收调用代码。感谢

下面是代码“Service Broker 创建代码”

CREATE PROCEDURE dbo.OnDataInserted

@EntityType NVARCHAR(100),
@MessageID BIGINT,
@uuid uniqueidentifier,
@message_body nvarchar(max)
AS

BEGIN

SET NOCOUNT ON;

 DECLARE @conversation UNIQUEIDENTIFIER

BEGIN DIALOG CONVERSATION @conversation
FROM SERVICE DataInsertSndService
TO SERVICE 'DataInsertRcvService'
ON CONTRACT DataInsertContract
WITH RELATED_CONVERSATION_GROUP = @uuid;

SEND ON CONVERSATION @conversation
MESSAGE TYPE DataInserted
(CAST(@message_body))

下面是代码“接收代码”

WHILE 0 < @@TRANCOUNT ROLLBACK; SET NOCOUNT ON

BEGIN TRANSACTION;

DECLARE 
@cID as uniqueidentifier, 
@conversationHandle as uniqueidentifier,
@conversationGroupId as uniqueidentifier,
@tempConversationGroupId as uniqueidentifier,
@message_body VARBINARY(MAX)

RAISERROR ('Awaiting Message ...', 16, 1) WITH NOWAIT

;WAITFOR (RECEIVE TOP (1) 
@cID = Substring(CAST(message_body as nvarchar(max)),4,36), 
@conversationHandle = [conversation_handle],
@conversationGroupId = [conversation_group_id],
@message_body = message_body
FROM DataInsertRcvQueue)

RAISERROR ('Message Received', 16, 1) WITH NOWAIT
Select @tempConversationGroupId = conversationGroupID from ConversationGroupMapper where cID = @cID; 
declare @temp as nvarchar(max);
Set @temp = CAST(@tempConversationGroupId as nvarchar(max));
if @temp  <> ''
BEGIN
    MOVE CONVERSATION @conversationHandle TO @tempConversationGroupId;

RAISERROR ('Moved to Existing Conversation Group' , 16, 1) WITH NOWAIT
END
    else
BEGIN
insert into ConversationGroupMapper values (@cID,@conversationGroupId);

RAISERROR ('New Conversation Group' , 16, 1) WITH NOWAIT
END

WAITFOR DELAY '000:00:10'

COMMIT

RAISERROR ('Committed' , 16, 1) WITH NOWAIT

详细说明

我们的情况是,我们需要循环接收来自此 Service Broker 队列的项目,在 WAITFOR 上阻塞,并且通过不可靠的网络将它们移交给另一个系统。从队列接收到的项目的目的地是与该远程系统的多个连接之一。如果该项目未成功传送到其他系统,则应回滚该单个项目的事务,并且该项目将返回到队列。我们在成功交付后提交事务,解锁后续循环迭代将拾取的消息序列。

相关项目序列中的延迟不应影响不相关序列的交付。单个项目一旦可用就会立即发送到队列中并立即转发。项目应以单文件形式转发,但即使在一个序列中,交付顺序也并不严格重要。

在一次接收一条消息的循环中,从我们的打开连接列表中选择一个新的或现有的 TcpClient,并且消息和打开的连接通过异步 IO 回调链传递,直到传输完成。然后我们完成数据库事务,在该事务中我们从 Service Broker 队列接收了项目。

如何使用 Service Broker 和对话组来帮助解决这种情况?

Currently we are using service broker to send the messages back and forth, which is working fine. But we wanted to group those messages by using the RELATED_CONVERSATION_GROUP. We wanted to use our own database persisted uuid as a RELATED_CONVERSATION_GROUP = @uuid from our database, but even though we use the same uuid every time the conversion_group_id comes different each time we receive the queue.

Do you guys know what is wrong with way i am creating the broker or the receive call, i have provided both the broker creation code and the receive call code below. Thanks

below is the code "Service Broker creation code"

CREATE PROCEDURE dbo.OnDataInserted

@EntityType NVARCHAR(100),
@MessageID BIGINT,
@uuid uniqueidentifier,
@message_body nvarchar(max)
AS

BEGIN

SET NOCOUNT ON;

 DECLARE @conversation UNIQUEIDENTIFIER

BEGIN DIALOG CONVERSATION @conversation
FROM SERVICE DataInsertSndService
TO SERVICE 'DataInsertRcvService'
ON CONTRACT DataInsertContract
WITH RELATED_CONVERSATION_GROUP = @uuid;

SEND ON CONVERSATION @conversation
MESSAGE TYPE DataInserted
(CAST(@message_body))

below is the code "Receive code"

WHILE 0 < @@TRANCOUNT ROLLBACK; SET NOCOUNT ON

BEGIN TRANSACTION;

DECLARE 
@cID as uniqueidentifier, 
@conversationHandle as uniqueidentifier,
@conversationGroupId as uniqueidentifier,
@tempConversationGroupId as uniqueidentifier,
@message_body VARBINARY(MAX)

RAISERROR ('Awaiting Message ...', 16, 1) WITH NOWAIT

;WAITFOR (RECEIVE TOP (1) 
@cID = Substring(CAST(message_body as nvarchar(max)),4,36), 
@conversationHandle = [conversation_handle],
@conversationGroupId = [conversation_group_id],
@message_body = message_body
FROM DataInsertRcvQueue)

RAISERROR ('Message Received', 16, 1) WITH NOWAIT
Select @tempConversationGroupId = conversationGroupID from ConversationGroupMapper where cID = @cID; 
declare @temp as nvarchar(max);
Set @temp = CAST(@tempConversationGroupId as nvarchar(max));
if @temp  <> ''
BEGIN
    MOVE CONVERSATION @conversationHandle TO @tempConversationGroupId;

RAISERROR ('Moved to Existing Conversation Group' , 16, 1) WITH NOWAIT
END
    else
BEGIN
insert into ConversationGroupMapper values (@cID,@conversationGroupId);

RAISERROR ('New Conversation Group' , 16, 1) WITH NOWAIT
END

WAITFOR DELAY '000:00:10'

COMMIT

RAISERROR ('Committed' , 16, 1) WITH NOWAIT

Elaboration

Our situation is that we need to receive items from this Service Broker queue in a loop, blocking on WAITFOR, and hand them off to another system over an unreliable network. Items received from the queue are destined for one of many connections to that remote system. If the item is not successfully delivered to the other system, the transaction for that single item should be rolled back and the item will be returned to the queue. We commit the transaction upon successful delivery, unlocking the sequence of messages to be picked up by a subsequent loop iteration.

Delays in a sequence of related items should not affect delivery of unrelated sequences. Single items are sent into the queue as soon as they are available and are forwarded immediately. Items should be forwarded single-file, though order of delivery even within a sequence is not strictly important.

From the loop that receives one message at a time, a new or existing TcpClient is selected from our list of open connections, and the message and the open connection are passed along though the chain of asynchronous IO callbacks until the transmission is complete. Then we complete the DB Transaction in which we received the Item from the Service Broker Queue.

How can Service Broker and conversation groups be used to assist in this scenario?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

━╋う一瞬間旳綻放 2024-11-23 08:24:44

会话组只是一个本地概念,专门用于锁定:相关会话属于一个组,这样当您在一个会话上处理消息时,另一个线程无法处理相关消息。没有有关两个端点交换的会话组的信息,因此在您的示例中,所有发起方端点最终都属于一个会话组,但目标端点都是一个不同的会话组(每个组只有一个会话)。系统这样做的原因是因为对话组旨在解决诸如旅行预订服务之类的问题:当它收到“预订旅行”的消息时,它必须预订航班、酒店和汽车出租。它必须发送三条消息,每一条消息发送给这些服务(“航班”、“酒店”、“汽车”),然后响应将异步返回。当它们返回时,处理必须确保它们不会由单独的线程同时处理,每个线程都会尝试更新“行程”记录状态。在消息传递中,这个问题被称为“消息关联问题”。

然而,会话组通常仅出于性能原因而部署在 SSB 中:它们允许更大的 RECEIVE 结果。可以使用 MOVE CONVERSATION 但实际上有一个更简单的技巧:反转对话的方向。让您的目的地启动对话(分组),发送关于目的地启动的对话的“更新”。

一些注意事项:

  • 不要使用 BEGIN/SEND/END 的即发即忘模式。您将无法在将来诊断任何问题,请参阅即发即忘:适用于军队,但不适用于 Service Broker 对话
  • 切勿在生产代码中使用WITH CLEANUP。它旨在用于灾难恢复等行政最后手段。如果您滥用它,您就拒绝了 SSB 正确跟踪消息以进行正确重试传递的任何机会(如果消息在目标上弹回,无论出于何种原因,它将永远丢失)。
  • SSB 不保证对话之间的顺序,仅保证一个对话内的顺序。为每个 INSERT 事件启动新对话并不能保证按目标保留插入操作的顺序。

Conversation groups are a local concept only, used exclusively for locking: correlated conversations belong in a group so that while you process a message on one conversation, another thread cannot process a correlated message. There is no information about conversation groups exchanged by the two endpoints, so in your example all the initiator endpoints end up belonging to one conversation group, but the target endpoints are each a distinct conversation group (each group having only one conversation). The reason the system behaves like this is because conversation groups are designed to address a problem like, say, a trip booking service: when it receives a message to 'book a trip', it has to reserve a flight, a hotel and a car rental. It must send three messages, one to each of these services ('flights', 'hotels', 'cars') and then the responses will come back, asynchronously. When they do come back, the processing must ensure that they are not processed concurrently by separate threads, which would each try to update the 'trip' record status. In messaging, this problem is know as 'message correlation problem'.

However, often conversation groups are deployed in SSB solely for performance reasons: they allow larger RECEIVE results. Target endpoints can be moved together into a group by using MOVE CONVERSATION but in practice there is a much simpler trick: reverse the direction of the conversation. Have your destination start the conversations (grouped), and the source sends its 'updates' on the conversation(s) started by the destination.

Some notes:

  • Don't use the fire-and-forget pattern of BEGIN/SEND/END. You're making it impossible to diagnose any problem in future, see Fire and Forget: Good for the military, but not for Service Broker conversations.
  • Never ever use WITH CLEANUP in production code. It is intended for administrative last-resort action like disaster recovery. If you abuse it you deny SSB any chance to properly track the message for correct retry delivery (if the message bounces on the target, for whatever reason, it will be lost forever).
  • SSB does not guarantee order across conversations, only within one conversation. Starting a new conversation for each INSERT event does not guarantee to preserve, on target, the order of insert operations.
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文