Currently we are using service broker to send the messages back and forth, which is working fine. But we wanted to group those messages by using the RELATED_CONVERSATION_GROUP. We wanted to use our own database persisted uuid as a RELATED_CONVERSATION_GROUP = @uuid from our database, but even though we use the same uuid every time the conversion_group_id comes different each time we receive the queue.
Do you guys know what is wrong with way i am creating the broker or the receive call, i have provided both the broker creation code and the receive call code below. Thanks
below is the code "Service Broker creation code"
CREATE PROCEDURE dbo.OnDataInserted
@EntityType NVARCHAR(100),
@MessageID BIGINT,
@uuid uniqueidentifier,
@message_body nvarchar(max)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @conversation UNIQUEIDENTIFIER
BEGIN DIALOG CONVERSATION @conversation
FROM SERVICE DataInsertSndService
TO SERVICE 'DataInsertRcvService'
ON CONTRACT DataInsertContract
WITH RELATED_CONVERSATION_GROUP = @uuid;
SEND ON CONVERSATION @conversation
MESSAGE TYPE DataInserted
(CAST(@message_body))
below is the code "Receive code"
WHILE 0 < @@TRANCOUNT ROLLBACK; SET NOCOUNT ON
BEGIN TRANSACTION;
DECLARE
@cID as uniqueidentifier,
@conversationHandle as uniqueidentifier,
@conversationGroupId as uniqueidentifier,
@tempConversationGroupId as uniqueidentifier,
@message_body VARBINARY(MAX)
RAISERROR ('Awaiting Message ...', 16, 1) WITH NOWAIT
;WAITFOR (RECEIVE TOP (1)
@cID = Substring(CAST(message_body as nvarchar(max)),4,36),
@conversationHandle = [conversation_handle],
@conversationGroupId = [conversation_group_id],
@message_body = message_body
FROM DataInsertRcvQueue)
RAISERROR ('Message Received', 16, 1) WITH NOWAIT
Select @tempConversationGroupId = conversationGroupID from ConversationGroupMapper where cID = @cID;
declare @temp as nvarchar(max);
Set @temp = CAST(@tempConversationGroupId as nvarchar(max));
if @temp <> ''
BEGIN
MOVE CONVERSATION @conversationHandle TO @tempConversationGroupId;
RAISERROR ('Moved to Existing Conversation Group' , 16, 1) WITH NOWAIT
END
else
BEGIN
insert into ConversationGroupMapper values (@cID,@conversationGroupId);
RAISERROR ('New Conversation Group' , 16, 1) WITH NOWAIT
END
WAITFOR DELAY '000:00:10'
COMMIT
RAISERROR ('Committed' , 16, 1) WITH NOWAIT
Elaboration
Our situation is that we need to receive items from this Service Broker queue in a loop, blocking on WAITFOR, and hand them off to another system over an unreliable network. Items received from the queue are destined for one of many connections to that remote system. If the item is not successfully delivered to the other system, the transaction for that single item should be rolled back and the item will be returned to the queue. We commit the transaction upon successful delivery, unlocking the sequence of messages to be picked up by a subsequent loop iteration.
Delays in a sequence of related items should not affect delivery of unrelated sequences. Single items are sent into the queue as soon as they are available and are forwarded immediately. Items should be forwarded single-file, though order of delivery even within a sequence is not strictly important.
From the loop that receives one message at a time, a new or existing TcpClient is selected from our list of open connections, and the message and the open connection are passed along though the chain of asynchronous IO callbacks until the transmission is complete. Then we complete the DB Transaction in which we received the Item from the Service Broker Queue.
How can Service Broker and conversation groups be used to assist in this scenario?
Conversation groups are a local concept only, used exclusively for locking: correlated conversations belong in a group so that while you process a message on one conversation, another thread cannot process a correlated message. There is no information about conversation groups exchanged by the two endpoints, so in your example all the initiator endpoints end up belonging to one conversation group, but the target endpoints are each a distinct conversation group (each group having only one conversation). The reason the system behaves like this is because conversation groups are designed to address a problem like, say, a trip booking service: when it receives a message to 'book a trip', it has to reserve a flight, a hotel and a car rental. It must send three messages, one to each of these services ('flights', 'hotels', 'cars') and then the responses will come back, asynchronously. When they do come back, the processing must ensure that they are not processed concurrently by separate threads, which would each try to update the 'trip' record status. In messaging, this problem is know as 'message correlation problem'.
However, often conversation groups are deployed in SSB solely for performance reasons: they allow larger RECEIVE results. Target endpoints can be moved together into a group by using MOVE CONVERSATION
but in practice there is a much simpler trick: reverse the direction of the conversation. Have your destination start the conversations (grouped), and the source sends its 'updates' on the conversation(s) started by the destination.
Some notes:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With