Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Sql Server Service Broker

Currently we are using service broker to send the messages back and forth, which is working fine. But we wanted to group those messages by using the RELATED_CONVERSATION_GROUP. We wanted to use our own database persisted uuid as a RELATED_CONVERSATION_GROUP = @uuid from our database, but even though we use the same uuid every time the conversion_group_id comes different each time we receive the queue.

Do you guys know what is wrong with way i am creating the broker or the receive call, i have provided both the broker creation code and the receive call code below. Thanks

below is the code "Service Broker creation code"

CREATE PROCEDURE dbo.OnDataInserted

@EntityType NVARCHAR(100),
@MessageID BIGINT,
@uuid uniqueidentifier,
@message_body nvarchar(max)
AS

BEGIN

SET NOCOUNT ON;

 DECLARE @conversation UNIQUEIDENTIFIER

BEGIN DIALOG CONVERSATION @conversation
FROM SERVICE DataInsertSndService
TO SERVICE 'DataInsertRcvService'
ON CONTRACT DataInsertContract
WITH RELATED_CONVERSATION_GROUP = @uuid;

SEND ON CONVERSATION @conversation
MESSAGE TYPE DataInserted
(CAST(@message_body))

below is the code "Receive code"

WHILE 0 < @@TRANCOUNT ROLLBACK; SET NOCOUNT ON

BEGIN TRANSACTION;

DECLARE 
@cID as uniqueidentifier, 
@conversationHandle as uniqueidentifier,
@conversationGroupId as uniqueidentifier,
@tempConversationGroupId as uniqueidentifier,
@message_body VARBINARY(MAX)

RAISERROR ('Awaiting Message ...', 16, 1) WITH NOWAIT

;WAITFOR (RECEIVE TOP (1) 
@cID = Substring(CAST(message_body as nvarchar(max)),4,36), 
@conversationHandle = [conversation_handle],
@conversationGroupId = [conversation_group_id],
@message_body = message_body
FROM DataInsertRcvQueue)

RAISERROR ('Message Received', 16, 1) WITH NOWAIT
Select @tempConversationGroupId = conversationGroupID from ConversationGroupMapper where cID = @cID; 
declare @temp as nvarchar(max);
Set @temp = CAST(@tempConversationGroupId as nvarchar(max));
if @temp  <> ''
BEGIN
    MOVE CONVERSATION @conversationHandle TO @tempConversationGroupId;

RAISERROR ('Moved to Existing Conversation Group' , 16, 1) WITH NOWAIT
END
    else
BEGIN
insert into ConversationGroupMapper values (@cID,@conversationGroupId);

RAISERROR ('New Conversation Group' , 16, 1) WITH NOWAIT
END

WAITFOR DELAY '000:00:10'

COMMIT

RAISERROR ('Committed' , 16, 1) WITH NOWAIT

Elaboration

Our situation is that we need to receive items from this Service Broker queue in a loop, blocking on WAITFOR, and hand them off to another system over an unreliable network. Items received from the queue are destined for one of many connections to that remote system. If the item is not successfully delivered to the other system, the transaction for that single item should be rolled back and the item will be returned to the queue. We commit the transaction upon successful delivery, unlocking the sequence of messages to be picked up by a subsequent loop iteration.

Delays in a sequence of related items should not affect delivery of unrelated sequences. Single items are sent into the queue as soon as they are available and are forwarded immediately. Items should be forwarded single-file, though order of delivery even within a sequence is not strictly important.

From the loop that receives one message at a time, a new or existing TcpClient is selected from our list of open connections, and the message and the open connection are passed along though the chain of asynchronous IO callbacks until the transmission is complete. Then we complete the DB Transaction in which we received the Item from the Service Broker Queue.

How can Service Broker and conversation groups be used to assist in this scenario?

like image 844
Gopi Avatar asked Jan 19 '23 10:01

Gopi


1 Answers

Conversation groups are a local concept only, used exclusively for locking: correlated conversations belong in a group so that while you process a message on one conversation, another thread cannot process a correlated message. There is no information about conversation groups exchanged by the two endpoints, so in your example all the initiator endpoints end up belonging to one conversation group, but the target endpoints are each a distinct conversation group (each group having only one conversation). The reason the system behaves like this is because conversation groups are designed to address a problem like, say, a trip booking service: when it receives a message to 'book a trip', it has to reserve a flight, a hotel and a car rental. It must send three messages, one to each of these services ('flights', 'hotels', 'cars') and then the responses will come back, asynchronously. When they do come back, the processing must ensure that they are not processed concurrently by separate threads, which would each try to update the 'trip' record status. In messaging, this problem is know as 'message correlation problem'.

However, often conversation groups are deployed in SSB solely for performance reasons: they allow larger RECEIVE results. Target endpoints can be moved together into a group by using MOVE CONVERSATION but in practice there is a much simpler trick: reverse the direction of the conversation. Have your destination start the conversations (grouped), and the source sends its 'updates' on the conversation(s) started by the destination.

Some notes:

  • Don't use the fire-and-forget pattern of BEGIN/SEND/END. You're making it impossible to diagnose any problem in future, see Fire and Forget: Good for the military, but not for Service Broker conversations.
  • Never ever use WITH CLEANUP in production code. It is intended for administrative last-resort action like disaster recovery. If you abuse it you deny SSB any chance to properly track the message for correct retry delivery (if the message bounces on the target, for whatever reason, it will be lost forever).
  • SSB does not guarantee order across conversations, only within one conversation. Starting a new conversation for each INSERT event does not guarantee to preserve, on target, the order of insert operations.
like image 114
Remus Rusanu Avatar answered Jan 21 '23 23:01

Remus Rusanu