Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I make my ActiveMQ broker drop offline durable subscribers

We have an ActiveMQ broker that's connected to from very different clients using JMS, AMQP, and MQTT. For some reason we haven't figured out yet a specific set of MQTT clients often (not always) subscribes durably. This is a test environment where clients are added and removed quite often, the latter sometimes by pulling the plug or rebooting an embedded device, so that they cannot properly unsubscribe. The effect (IIUC) is that the broker piles up "offline durable subscription" for devices which it might never see again (I can see these under http://my_broker:8161/admin/subscribers.jsp), keeping messages on those topics forever, until it finally breaks down under its own memory footprint.

The issue at hand here is that the subscribers subscribe durably, and we need to find out why that's the case. However, it was also decided that clients doing this (unwittingly) shouldn't bring the broker to a grinding halt, so we need to solve this problem independently.

I have found there are settings for a timeout for offline durable subscriptions and put those into our broker configuration (last two lines):

<broker
  xmlns="http://activemq.apache.org/schema/core" 
  brokerName="my_broker"
  dataDirectory="${activemq.data}" 
  useJmx="true"
  advisorySupport="false" 
  persistent="false"
  offlineDurableSubscriberTimeout="1800000"
  offlineDurableSubscriberTaskSchedule="60000">

If I understand correctly, the above should check every minute and dismiss clients it hasn't seen for half an hour. However, contrary to the docs, this doesn't seem to work: A consumer I had subscribe and then pulled the plug on days ago is still visible in the list of offline durable subscribers, the broker's memory footprint is constantly increasing, and if I delete subscribers manually in the broker's web interface I can see the memory footprint going down.

So here's my questions:

  1. What determines whether a MQTT subscription to a topic on an ActiveMQ broker is durable?
  2. What am I doing wrong in setting up the timeout for dropping offline durably subscriptions in the ActiveMQ settings?
like image 508
sbi Avatar asked Nov 10 '16 14:11

sbi


1 Answers

I extracted the relevant code (doCleanup()) that removes timed out durable subscriptions.

In success case, it executes:

    LOG.info("Destroying durable subscriber due to inactivity: {}", sub);

In failure case, it executes:

    LOG.error("Failed to remove inactive durable subscriber", e);

Look for above log line in your log file and match it with details that you observed using admin/subscribers.jsp viewer. If it doesn't print any of the lines, the subscriptions might be remaining active for some reason or you may have stumbled into a bug.

Also, could you try to remove the underscore (_) in broker name if you can? The manual talks about problems with underscores in broker names.

Code:

public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
   super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
   if (broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule() != -1 && broker.getBrokerService().getOfflineDurableSubscriberTimeout() != -1) {
      this.cleanupTimer = new Timer("ActiveMQ Durable Subscriber Cleanup Timer", true);
      this.cleanupTask = new TimerTask() {
         @Override
         public void run() {
            doCleanup();
         }
      };
      this.cleanupTimer.schedule(cleanupTask, broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule(),broker.getBrokerService().getOfflineDurableSubscriberTaskSchedule());
   }
}

public void doCleanup() {
   long now = System.currentTimeMillis();
   for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) {
      DurableTopicSubscription sub = entry.getValue();
      if (!sub.isActive()) {
         long offline = sub.getOfflineTimestamp();
         if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) {
            LOG.info("Destroying durable subscriber due to inactivity: {}", sub);
            try {
               RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
               info.setClientId(entry.getKey().getClientId());
               info.setSubscriptionName(entry.getKey().getSubscriptionName());
               ConnectionContext context = new ConnectionContext();
               context.setBroker(broker);
               context.setClientId(entry.getKey().getClientId());
               removeSubscription(context, info);
            } catch (Exception e) {
               LOG.error("Failed to remove inactive durable subscriber", e);
            }
         }
      }
   }
}

// The toString method for DurableTopicSubscription class
@Override
public synchronized String toString() {
    return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount() + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
}
like image 183
blackpen Avatar answered Sep 19 '22 03:09

blackpen