Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Google PubSub - Counting messages in topic

I've looked over the documentation for Google's PubSub, and also tried looking in Google Cloud Monitoring, but couldn't find any means of figuring out what's the queue size in my topics.

Since I plan on using PubSub for analytics, it's important for me to monitor the queue count, so I could scale up/down the subscriber count.

What am I missing?

like image 390
Gil Adirim Avatar asked Feb 18 '16 07:02

Gil Adirim


People also ask

What happens if two subscribers use the same subscription to pull messages from a topic in Google pub sub?

Multiple subscribers can make pull calls to the same "shared" subscription. Each subscriber receives a subset of the messages. The push endpoint can be a load balancer. The Pub/Sub service automatically balances the load.

Can Pubsub hold millions of messages?

Resource limits There is no limit on the number of retained messages.


5 Answers

The metric you want to look at is "undelivered messages." You should be able to set up alerts or charts that monitor this metric in Google Cloud Monitoring under the "Pub/Sub Subscription" resource type. The number of messages that have not yet been acknowledged by subscribers, i.e., queue size, is a per-subscription metric as opposed to a per-topic metric. For info on the metric, see pubsub.googleapis.com/subscription/num_undelivered_messages in the GCP Metrics List (and others for all of the Pub/Sub metrics available).

like image 143
Kamal Aboul-Hosn Avatar answered Sep 29 '22 16:09

Kamal Aboul-Hosn


This might help if you're looking into a programmatic way to achieve this:

from google.cloud import monitoring_v3
from google.cloud.monitoring_v3 import query

project = "my-project"
client = monitoring_v3.MetricServiceClient()
result = query.Query(
         client,
         project,
         'pubsub.googleapis.com/subscription/num_undelivered_messages', 
         minutes=60).as_dataframe()

print(result['pubsub_subscription'][project]['subscription_name'][0])
like image 37
Steeve Avatar answered Oct 03 '22 16:10

Steeve


The answer to your question is "no", there is no feature for PubSub that shows these counts. The way you have to do it is via log event monitoring using Stackdriver (it took me some time to find that out too).

The colloquial answer to this is do the following, step-by-step:

  1. Navigate from GCloud Admin Console to: Monitoring

navigate from gcloud admin console

  1. This opens a new window with separate Stackdriver console
  2. Navigate in Stackdriver: Dashboards > Create Dashboard

create new dashboard within stackdriver

  1. Click the Add Chart button top-right of dashboard screen

enter image description here

  1. In the input box, type num_undelivered_messages and then SAVE

auto suggested metrics to add chart

like image 30
Mike S. Avatar answered Sep 29 '22 16:09

Mike S.


Updated version based on @steeve's answer. (without pandas dependency)

Please note that you have to specify end_time instead of using default utcnow().

import datetime
from google.cloud import monitoring_v3
from google.cloud.monitoring_v3 import query

project = 'my-project'
sub_name = 'my-sub'
client = monitoring_v3.MetricServiceClient()
result = query.Query(
  client,
  project,
  'pubsub.googleapis.com/subscription/num_undelivered_messages',
  end_time=datetime.datetime.now(),
  minutes=1,
  ).select_resources(subscription_id=sub_name)

for content in result:
  print(content.points[0].value.int64_value)
like image 23
northtree Avatar answered Sep 29 '22 16:09

northtree


Here is a java version

package com.example.monitoring;

import static com.google.cloud.monitoring.v3.MetricServiceClient.create;
import static com.google.monitoring.v3.ListTimeSeriesRequest.newBuilder;
import static com.google.monitoring.v3.ProjectName.of;
import static com.google.protobuf.util.Timestamps.fromMillis;
import static java.lang.System.currentTimeMillis;

import com.google.monitoring.v3.ListTimeSeriesRequest;
import com.google.monitoring.v3.TimeInterval;

public class ReadMessagesFromGcp {

  public static void main(String... args) throws Exception {
   
    String projectId = "put here";

    var interval = TimeInterval.newBuilder()
                               .setStartTime(fromMillis(currentTimeMillis() - (120 * 1000)))
                               .setEndTime(fromMillis(currentTimeMillis()))
                               .build();

    var request = newBuilder().setName(of(projectId).toString())
           .setFilter("metric.type=\"pubsub.googleapis.com/subscription/num_undelivered_messages\"")
           .setInterval(interval)
           .setView(ListTimeSeriesRequest.TimeSeriesView.FULL)
           .build();

    var response = create().listTimeSeries(request);

    for (var subscriptionData : response.iterateAll()) {
        
        var subscription = subscriptionData.getResource().getLabelsMap().get("subscription_id");
        
        var numberOrMessages = subscriptionData.getPointsList().get(0).getValue().getInt64Value();
            
        if(numberOrMessages > 0) {
            System.out.println(subscription + " has " + numberOrMessages + " messages ");
        }
            
    }
  }
}
<dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-monitoring</artifactId>
            <version>3.3.2</version>
        </dependency>
    
        <dependency>
          <groupId>com.google.protobuf</groupId>
          <artifactId>protobuf-java-util</artifactId>
          <version>4.0.0-rc-2</version>
        </dependency>

output

queue-1 has 36 messages

queue-2 has 4 messages

queue-3 has 3 messages

like image 30
Bruno Lee Avatar answered Oct 03 '22 16:10

Bruno Lee