Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Google Pub/Sub Java examples

I'm not able to find a way to read messages from pub/sub using java.

I'm using this maven dependency in my pom

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsub</artifactId>
  <version>0.17.2-alpha</version>
</dependency>

I implemented this main method to create a new topic:

public static void main(String... args) throws Exception {

        // Your Google Cloud Platform project ID
        String projectId = ServiceOptions.getDefaultProjectId();

        // Your topic ID
        String topicId = "my-new-topic-1";
        // Create a new topic
        TopicName topic = TopicName.create(projectId, topicId);
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            topicAdminClient.createTopic(topic); 
        }
}

The above code works well and, indeed, I can see the new topic I created using the google cloud console.

I implemented the following main method to write a message to my topic:

public static void main(String a[]) throws InterruptedException, ExecutionException{
        String projectId = ServiceOptions.getDefaultProjectId(); 
        String topicId = "my-new-topic-1";

        String payload = "Hellooooo!!!";
        PubsubMessage pubsubMessage =
                  PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(payload)).build();

        TopicName topic = TopicName.create(projectId, topicId);

        Publisher publisher;
        try {
            publisher = Publisher.defaultBuilder(
                    topic)
                    .build();
            publisher.publish(pubsubMessage);

            System.out.println("Sent!");
        } catch (IOException e) {
            System.out.println("Not Sended!");
            e.printStackTrace();
        }
}

Now I'm not able to verify if this message was really sent. I would like to implement a message reader using a subscription to my topic. Could someone show me a correct and working java example about reading messages from a topic?

Anyone can help me? Thanks in advance!

like image 204
Andrea Zonzin Avatar asked Jun 11 '17 15:06

Andrea Zonzin


2 Answers

I haven't used google cloud client libraries but used the api client libraries. Here is how I created a subscription.

package com.techm.datapipeline.client;

import java.io.IOException;
import java.security.GeneralSecurityException;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpStatusCodes;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Create;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Get;
import com.google.api.services.pubsub.Pubsub.Projects.Topics;
import com.google.api.services.pubsub.model.ExpirationPolicy;
import com.google.api.services.pubsub.model.Subscription;
import com.google.api.services.pubsub.model.Topic;
import com.techm.datapipeline.factory.PubsubFactory;

public class CreatePullSubscriberClient {

    private final static String PROJECT_NAME = "yourProjectId";
    private final static String TOPIC_NAME = "yourTopicName";
    private final static String SUBSCRIPTION_NAME = "yourSubscriptionName";

    public static void main(String[] args) throws IOException, GeneralSecurityException {
        Pubsub pubSub = PubsubFactory.getService();

        String topicName = String.format("projects/%s/topics/%s", PROJECT_NAME, TOPIC_NAME);
        String subscriptionName = String.format("projects/%s/subscriptions/%s", PROJECT_NAME, SUBSCRIPTION_NAME);

        Topics.Get listReq = pubSub.projects().topics().get(topicName);
        Topic topic = listReq.execute();

        if (topic == null) {
            System.err.println("Topic doesn't exist...run CreateTopicClient...to create the topic");
            System.exit(0);
        }

        Subscription subscription = null;
        try {
            Get getReq = pubSub.projects().subscriptions().get(subscriptionName);
            subscription = getReq.execute();
        } catch (GoogleJsonResponseException e) {
            if (e.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
                System.out.println("Subscription " + subscriptionName + " does not exist...will create it");
            }
        }

        if (subscription != null) {
            System.out.println("Subscription already exists ==> " + subscription.toPrettyString());
            System.exit(0);
        }

        subscription = new Subscription();

        subscription.setTopic(topicName);
        subscription.setPushConfig(null); // indicating a pull

        ExpirationPolicy expirationPolicy = new ExpirationPolicy();
        expirationPolicy.setTtl(null); // never expires;
        subscription.setExpirationPolicy(expirationPolicy);

        subscription.setAckDeadlineSeconds(null); // so defaults to 10 sec

        subscription.setRetainAckedMessages(true);

        Long _week = 7L * 24 * 60 * 60;
        subscription.setMessageRetentionDuration(String.valueOf(_week)+"s");

        subscription.setName(subscriptionName);

        Create createReq = pubSub.projects().subscriptions().create(subscriptionName, subscription);
        Subscription createdSubscription = createReq.execute();

        System.out.println("Subscription created ==> " + createdSubscription.toPrettyString());
    }

}

And once you create the subscription (pull type)...this is how you pull the messages from the topic.

package com.techm.datapipeline.client;

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.List;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpStatusCodes;
import com.google.api.client.util.Base64;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Acknowledge;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Get;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Pull;
import com.google.api.services.pubsub.model.AcknowledgeRequest;
import com.google.api.services.pubsub.model.Empty;
import com.google.api.services.pubsub.model.PullRequest;
import com.google.api.services.pubsub.model.PullResponse;
import com.google.api.services.pubsub.model.ReceivedMessage;
import com.techm.datapipeline.factory.PubsubFactory;

public class PullSubscriptionsClient {

    private final static String PROJECT_NAME = "yourProjectId";
    private final static String SUBSCRIPTION_NAME = "yourSubscriptionName";

    private final static String SUBSCRIPTION_NYC_NAME = "test";


    public static void main(String[] args) throws IOException, GeneralSecurityException {
        Pubsub pubSub = PubsubFactory.getService();

        String subscriptionName = String.format("projects/%s/subscriptions/%s", PROJECT_NAME, SUBSCRIPTION_NAME);
        //String subscriptionName = String.format("projects/%s/subscriptions/%s", PROJECT_NAME, SUBSCRIPTION_NYC_NAME);

        try {
            Get getReq = pubSub.projects().subscriptions().get(subscriptionName);
            getReq.execute();
        } catch (GoogleJsonResponseException e) {
            if (e.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
                System.out.println("Subscription " + subscriptionName
                        + " does not exist...run CreatePullSubscriberClient to create");
            }
        }

        PullRequest pullRequest = new PullRequest();
        pullRequest.setReturnImmediately(false); // wait until you get a message
        pullRequest.setMaxMessages(1000);

        Pull pullReq = pubSub.projects().subscriptions().pull(subscriptionName, pullRequest);
        PullResponse pullResponse = pullReq.execute();

        List<ReceivedMessage> msgs = pullResponse.getReceivedMessages();
        List<String> ackIds = new ArrayList<String>();
        int i = 0;
        if (msgs != null) {
            for (ReceivedMessage msg : msgs) {
                ackIds.add(msg.getAckId());
                //System.out.println(i++ + ":===:" + msg.getAckId());
                String object = new String(Base64.decodeBase64(msg.getMessage().getData()));
                System.out.println("Decoded object String ==> " + object );
            }

            //acknowledge all the received messages
            AcknowledgeRequest content = new AcknowledgeRequest();
            content.setAckIds(ackIds);
            Acknowledge ackReq = pubSub.projects().subscriptions().acknowledge(subscriptionName, content);
            Empty empty = ackReq.execute();
        }

    }

}

Note: This client only waits until it receives at least one message and terminates if it's receives one (up to a max of value - set in MaxMessages) at once.

Let me know if this helps. I'm going to try the cloud client libraries soon and will post an update once I get my hands on them.

And here's the missing factory class ...if you plan to run it...

package com.techm.datapipeline.factory;


import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.PubsubScopes;

public class PubsubFactory {

    private static Pubsub instance = null;
    private static final Logger logger = Logger.getLogger(PubsubFactory.class.getName());

    public static synchronized Pubsub getService() throws IOException, GeneralSecurityException {
        if (instance == null) {
            instance = buildService();
        }
        return instance;
    }

    private static Pubsub buildService() throws IOException, GeneralSecurityException {
        logger.log(Level.FINER, "Start of buildService");
        HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
        JsonFactory jsonFactory = new JacksonFactory();
        GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory);

        // Depending on the environment that provides the default credentials (for
        // example: Compute Engine, App Engine), the credentials may require us to
        // specify the scopes we need explicitly. 
        if (credential.createScopedRequired()) {
            Collection<String> scopes = new ArrayList<>();
            scopes.add(PubsubScopes.PUBSUB);
            credential = credential.createScoped(scopes);
        }

        logger.log(Level.FINER, "End of buildService");

        // TODO - Get the application name from outside.
        return new Pubsub.Builder(transport, jsonFactory, credential).setApplicationName("Your Application Name/Version")
                .build();
    }

}
like image 88
Kishore Namala Avatar answered Oct 21 '22 23:10

Kishore Namala


The Cloud Pub/Sub Pull Subscriber Guide has sample code for reading messages from a topic.

like image 44
Kamal Aboul-Hosn Avatar answered Oct 22 '22 00:10

Kamal Aboul-Hosn