Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Batching PubSub requests

The NODEJS example code for batching pubsub requests looks like this:

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'your-topic';
// const data = JSON.stringify({ foo: 'bar' });
// const maxMessages = 10;
// const maxWaitTime = 10000;

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

pubsub
  .topic(topicName)
  .publisher({
    batching: {
      maxMessages: maxMessages,
      maxMilliseconds: maxWaitTime,
    },
  })
  .publish(dataBuffer)
  .then(results => {
    const messageId = results[0];
    console.log(`Message ${messageId} published.`);
  })
  .catch(err => {
    console.error('ERROR:', err);
  });

For me it is not clear how to publish multiple messages simultaneously using this example. Could someone explain how to adjust this code so it can be used to publish multiple messages simultaneously?

like image 980
Erik van den Hoorn Avatar asked Mar 02 '18 14:03

Erik van den Hoorn


2 Answers

If you wanted to batch messages, then you'd need to keep hold of the publisher and call publish on it multiple times. For example, you could change the code to something like this:

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();


const topicName = 'my-topic';
const maxMessages = 10;
const maxWaitTime = 10000;
const data1 = JSON.stringify({ foo: 'bar1' });
const data2 = JSON.stringify({ foo: 'bar2' });
const data3 = JSON.stringify({ foo: 'bar3' });

const publisher = pubsub.topic(topicName).publisher({
    batching: {
      maxMessages: maxMessages,
      maxMilliseconds: maxWaitTime,
    },
  })

function handleResult(p) {
  p.then(results => {
    console.log(`Message ${results} published.`);
  })
  .catch(err => {
    console.error('ERROR:', err);
  });
}

// Publish three messages
handleResult(publisher.publish(Buffer.from(data1)));
handleResult(publisher.publish(Buffer.from(data2)));
handleResult(publisher.publish(Buffer.from(data3)));

Batching of messages is handled by the maxMessages and maxMilliseconds properties. The former indicates the maximum number of messages to include in a batch. The latter indicates the maximum number of milliseconds to wait to publish a batch. These properties trade off larger batches (which can be more efficient) with publish latency. If you are publishing many messages rapidly, then the maxMilliseconds property won't have much effect; as soon as ten messages are ready to go, the client library will make a publish request to the Cloud Pub/Sub service. However, if publishing is sporadic or slow, then a batch of messages might be sent before there are ten messages.

In the example code above, we call publish on three messages. This is not enough to fill up a batch and send it. Therefore, 10,000 milliseconds after the first call to publish, the three messages will be sent as a batch to Cloud Pub/Sub.

like image 124
Kamal Aboul-Hosn Avatar answered Oct 31 '22 06:10

Kamal Aboul-Hosn


batching explanation:

  1. If the messages to be published reach the number specified by maxMessages, then ignore maxMilliseconds option and immediately publish messages equal to the number of maxMessages in batches;

  2. If the messages to be published do not reach the number specified by maxMessages, after waiting for the maxMilliseconds time, send these messages in batch

For example for 1:

async function publishMessage(topicName) {
  console.log(`[${new Date().toISOString()}] publishing messages`);
  const pubsub = new PubSub({ projectId: PUBSUB_PROJECT_ID });
  const topic = pubsub.topic(topicName, {
    batching: {
      maxMessages: 10,
      maxMilliseconds: 10 * 1000,
    },
  });

  const n = 12;
  const dataBufs: Buffer[] = [];
  for (let i = 0; i < n; i++) {
    const data = `message payload ${i}`;
    const dataBuffer = Buffer.from(data);
    dataBufs.push(dataBuffer);
  }

  const results = await Promise.all(
    dataBufs.map((dataBuf, idx) =>
      topic.publish(dataBuf).then((messageId) => {
        console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
        return messageId;
      })
    )
  );
  console.log('results:', results.toString());
}

Now, we will publish 12 messages. The execution result:

[2020-05-05T09:09:41.847Z] publishing messages
[2020-05-05T09:09:41.955Z] Message 36832 published. index: 0
[2020-05-05T09:09:41.955Z] Message 36833 published. index: 1
[2020-05-05T09:09:41.955Z] Message 36834 published. index: 2
[2020-05-05T09:09:41.955Z] Message 36835 published. index: 3
[2020-05-05T09:09:41.955Z] Message 36836 published. index: 4
[2020-05-05T09:09:41.955Z] Message 36837 published. index: 5
[2020-05-05T09:09:41.955Z] Message 36838 published. index: 6
[2020-05-05T09:09:41.955Z] Message 36839 published. index: 7
[2020-05-05T09:09:41.955Z] Message 36840 published. index: 8
[2020-05-05T09:09:41.955Z] Message 36841 published. index: 9
[2020-05-05T09:09:51.939Z] Message 36842 published. index: 10
[2020-05-05T09:09:51.939Z] Message 36843 published. index: 11
results: 36832,36833,36834,36835,36836,36837,36838,36839,36840,36841,36842,36843

Please note the timestamp. The first 10 messages will be published immediately because they the number specified by maxMessages. Then, because the rest 2 messages don't reach the number specified by maxMessages. So pubsub will wait for 10 seconds(maxMilliseconds), then send the rest 2 messages.

For example for 2:

async function publishMessage(topicName) {
  console.log(`[${new Date().toISOString()}] publishing messages`);
  const pubsub = new PubSub({ projectId: PUBSUB_PROJECT_ID });
  const topic = pubsub.topic(topicName, {
    batching: {
      maxMessages: 10,
      maxMilliseconds: 10 * 1000,
    },
  });

  const n = 5;
  const dataBufs: Buffer[] = [];
  for (let i = 0; i < n; i++) {
    const data = `message payload ${i}`;
    const dataBuffer = Buffer.from(data);
    dataBufs.push(dataBuffer);
  }

  const results = await Promise.all(
    dataBufs.map((dataBuf, idx) =>
      topic.publish(dataBuf).then((messageId) => {
        console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
        return messageId;
      })
    )
  );
  console.log('results:', results.toString());
}

Now, we will send 5 messages, they don't reach the number specified by maxMessages. So pubsub will wait for 10 seconds(maxMilliseconds). After waiting for 10 seconds(maxMilliseconds), pubsub will send these 5 messages bulk. This scenario is the same as the remaining 2 messages in the first example. The exeuction result:

[2020-05-05T09:10:16.857Z] publishing messages
[2020-05-05T09:10:26.977Z] Message 36844 published. index: 0
[2020-05-05T09:10:26.977Z] Message 36845 published. index: 1
[2020-05-05T09:10:26.977Z] Message 36846 published. index: 2
[2020-05-05T09:10:26.977Z] Message 36847 published. index: 3
[2020-05-05T09:10:26.977Z] Message 36848 published. index: 4
results: 36844,36845,36846,36847,36848
like image 29
slideshowp2 Avatar answered Oct 31 '22 06:10

slideshowp2