Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a durable publisher/subscriber topic using the AMQP.Net Lite library in .NET Core with clientId and subscriber name and topic name

Tags:

activemq

amqp

I am new to ActiveMQ, but I tried and am able to create a durable publisher, but I am not able to set Client Id, because I am not finding any properties with client Id and am even unable to find in Google. It will be great help if I will get some sample code.

Note: Not with the NMS protocol. I am using AMQP.Net Lite with ActiveMQ in the .NET Core Web API for creating a durable publisher/subscriber with ClientId.

like image 672
Mhasan Avatar asked May 06 '17 03:05

Mhasan


1 Answers

In order to create a durable subscription to ActiveMQ or ActiveMQ Artemis your client needs to do a couple things.

  1. Set a unique "client-id" for the client using the AMQP 'ContainerId' property which can be seen in the code below. The client must use that same container ID every time it connects and recovers it's durable subscription.

  2. Create a new Session.

  3. Create a new Receiver for the address (in this case Topic) that you want to subscribe to. The Source of a durable subscription need to have the address set to a Topic address (in ActiveMQ this is topic://name). The Source also needs the expiray policy set to NEVER, the Source must also have the terminus durability state set to UNSETTLED_STATE, and the distribution mode set to COPY.

  4. Once the Receiver is created then you can either set an onMessage handler in start or call receive to consume messages (assuming you've granted credit for the broker to send you any).


using System;
using Amqp;
using Amqp.Framing;
using Amqp.Types;
using Amqp.Sasl;
using System.Threading;

namespace aorg.apache.activemq.examples { class Program { private static string DEFAULT_BROKER_URI = "amqp://localhost:5672"; private static string DEFAULT_CONTAINER_ID = "client-1"; private static string DEFAULT_SUBSCRIPTION_NAME = "test-subscription"; private static string DEFAULT_TOPIC_NAME = "test-topic";

static void Main(string[] args) { Console.WriteLine("Starting AMQP durable consumer example."); Console.WriteLine("Creating a Durable Subscription"); CreateDurableSubscription(); Console.WriteLine("Attempting to recover a Durable Subscription"); RecoverDurableSubscription(); Console.WriteLine("Unsubscribe a durable subscription"); UnsubscribeDurableSubscription(); Console.WriteLine("Attempting to recover a non-existent durable subscription"); try { RecoverDurableSubscription(); throw new Exception("Subscription was not deleted."); } catch (AmqpException) { Console.WriteLine("Recover failed as expected"); } Console.WriteLine("Example Complete."); } // Creating a durable subscription involves creating a Receiver with a Source that // has the address set to the Topic name where the client wants to subscribe along // with an expiry policy of 'never', Terminus Durability set to 'unsettled' and the // Distribution Mode set to 'Copy'. The link name of the Receiver represents the // desired name of the Subscription and of course the Connection must carry a container // ID uniqure to the client that is creating the subscription. private static void CreateDurableSubscription() { Connection connection = new Connection(new Address(DEFAULT_BROKER_URI), SaslProfile.Anonymous, new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null); try { Session session = new Session(connection); Source source = CreateBasicSource(); // Create a Durable Consumer Source. source.Address = DEFAULT_TOPIC_NAME; source.ExpiryPolicy = new Symbol("never"); source.Durable = 2; source.DistributionMode = new Symbol("copy"); ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, source, null); session.Close(); } finally { connection.Close(); } } // Recovering an existing subscription allows the client to ask the remote // peer if a subscription with the given name for the current 'Container ID' // exists. The process involves the client attaching a receiver with a null // Source on a link with the desired subscription name as the link name and // the broker will then return a Source instance if this current container // has a subscription registered with that subscription (link) name. private static void RecoverDurableSubscription() { Connection connection = new Connection(new Address(DEFAULT_BROKER_URI), SaslProfile.Anonymous, new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null); try { Session session = new Session(connection); Source recoveredSource = null; ManualResetEvent attached = new ManualResetEvent(false); OnAttached onAttached = (link, attach) => { recoveredSource = (Source) attach.Source; attached.Set(); }; ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, (Source) null, onAttached); attached.WaitOne(10000); if (recoveredSource == null) { // The remote had no subscription matching what we asked for. throw new AmqpException(new Error()); } else { Console.WriteLine(" Receovered subscription for address: " + recoveredSource.Address); Console.WriteLine(" Recovered Source Expiry Policy = " + recoveredSource.ExpiryPolicy); Console.WriteLine(" Recovered Source Durability = " + recoveredSource.Durable); Console.WriteLine(" Recovered Source Distribution Mode = " + recoveredSource.DistributionMode); } session.Close(); } finally { connection.Close(); } } // Unsubscribing a durable subscription involves recovering an existing // subscription and then closing the receiver link explicitly or in AMQP // terms the close value of the Detach frame should be 'true' private static void UnsubscribeDurableSubscription() { Connection connection = new Connection(new Address(DEFAULT_BROKER_URI), SaslProfile.Anonymous, new Open() { ContainerId = DEFAULT_CONTAINER_ID }, null); try { Session session = new Session(connection); Source recoveredSource = null; ManualResetEvent attached = new ManualResetEvent(false); OnAttached onAttached = (link, attach) => { recoveredSource = (Source) attach.Source; attached.Set(); }; ReceiverLink receiver = new ReceiverLink(session, DEFAULT_SUBSCRIPTION_NAME, (Source) null, onAttached); attached.WaitOne(10000); if (recoveredSource == null) { // The remote had no subscription matching what we asked for. throw new AmqpException(new Error()); } else { Console.WriteLine(" Receovered subscription for address: " + recoveredSource.Address); Console.WriteLine(" Recovered Source Expiry Policy = " + recoveredSource.ExpiryPolicy); Console.WriteLine(" Recovered Source Durability = " + recoveredSource.Durable); Console.WriteLine(" Recovered Source Distribution Mode = " + recoveredSource.DistributionMode); } // Closing the Receiver vs. detaching it will unsubscribe receiver.Close(); session.Close(); } finally { connection.Close(); } } // Creates a basic Source type that contains common attributes needed // to describe to the remote peer the features and expectations of the // Source of the Receiver link. private static Source CreateBasicSource() { Source source = new Source(); // These are the outcomes this link will accept. Symbol[] outcomes = new Symbol[] {new Symbol("amqp:accepted:list"), new Symbol("amqp:rejected:list"), new Symbol("amqp:released:list"), new Symbol("amqp:modified:list") }; // Default Outcome for deliveries not settled on this link Modified defaultOutcome = new Modified(); defaultOutcome.DeliveryFailed = true; defaultOutcome.UndeliverableHere = false; // Configure Source. source.DefaultOutcome = defaultOutcome; source.Outcomes = outcomes; return source; } }

}

like image 180
Tim Bish Avatar answered Jan 04 '23 00:01

Tim Bish