Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Google PubSub and auto-scaling Compute Engine instances (Python)

In my scenario I'm scheduling tasks using PubSub. This is up to 2.000 PubSub messages that are than consumed by a Python script that runs inside a Docker Container within Google Compute Engine. That script consumes the PubSub messages.

The processing of each message is about 30seconds to 5min. Therefore the acknowledgement deadline is 600sec (10min).

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.message import Message

def handle_message(message: Message):
    # do your stuff here (max. 600sec)
    message.ack()
    return

def receive_messages(project, subscription_name):

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project, subscription_name)

    flow_control = pubsub_v1.types.FlowControl(max_messages=5)
    subscription = subscriber.subscribe(subscription_path, flow_control=flow_control)

    future = subscription.open(handle_message)

    # Blocks the thread while messages are coming in through the stream. Any
    # exceptions that crop up on the thread will be set on the future.
    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    try:
        future.result()
    except Exception as e:
        # do some logging
        raise

Because I'm working on so many PubSub messages, I'm creating a template for a Compute Engine that uses auto-scaling in either of these two ways:

gcloud compute instance-groups managed create my-worker-group \
  --zone=europe-west3-a \
  --template=my-worker-template \
  --size=0

gcloud beta compute instance-groups managed set-autoscaling my-worker-group \
  --zone=europe-west3-a \
  --max-num-replicas=50 \
  --min-num-replicas=0 \
  --target-cpu-utilization=0.4

gcloud beta compute instance-groups managed set-autoscaling my-worker-group \
  --zone=europe-west3-a \
  --max-num-replicas=50 \
  --min-num-replicas=0 \
  --update-stackdriver-metric=pubsub.googleapis.com/subscription/num_undelivered_messages \
  --stackdriver-metric-filter="resource.type = pubsub_subscription AND resource.label.subscription_id = my-pubsub-subscription" \
  --stackdriver-metric-single-instance-assignment=10

So far, so good. Option one scales up to about 8 instances while the second option will start the maximum number of instances. Now I figured out that some strange things happen and this is why I'm posting here. Maybe you can help me out?!

Message duplicates: It seems that the PubSub service in each instance (Python script inside the docker container within compute engine) reads a batch of messages (~10) in somewhat like a buffer and gives them to my code. Looks like all instances that spin up at the same time will read all the same messages (the first 10 of 2.000) and will start working on the same stuff. In my logs I see that most messages are processed 3 times by different machines. I was expecting that PubSub knows if some subscriber buffered 10 messages so that another subscriber will buffer 10 different messages and not the same ones.

Acknowledgement deadline: Because of the buffering the messages that come in the end of the buffer (let's say message 8 or 9) had to wait in the buffer until the preceding messages (messages 1 to 7) have been processed. The sum of that waiting time plus its own processing time may run into the timeout of 600sec.

Load-Balancing: Because each machine buffers so many messages, the load is consumed by just a few instances while other instances are completely idle. This happens for the scaling-option two that uses the PubSub stackdriver metric.

People told me that I need to implement a manual synchronization service using Cloud SQL or something else in which each instance indicates on which message it is working, so that other instances won't start the same. But I feel that can't be true - because then I don't get the idea what PubSub is all about.

pubsub behavior

Update: I found a nice explanation by Gregor Hohpe, co-author of the book Enterprise Integration Patterns from 2015. Actually my observation was wrong, but the observed side effects are real.

Google Cloud Pub/Sub API actually implements both the Publish-Subscribe Channel and the Competing Consumers patterns. At the heart of the Cloud Pub/Sub is a classic Publish-Subscribe Channel, which delivers a single message published to it to multiple subscribers. One advantage of this pattern is that adding subscribers is side-effect free, which is one reason a Publish-Subscribe Channel is sometimes considered more loosely coupled than a Point-to-Point Channel, which delivers a message to exactly one subscriber. Adding consumers to a Point-to-Point Channel results in Competing Consumers and thus has a strong side effect.

Copyright: Gregor Hohpe, co-author of the book Enterprise Integration Patterns. 2015.

The side effects I observed are about the message buffering and message flow control in each of the subscribers (who subscribed to the same subscription, point-to-point == competing consumers). The current version of the Python Client Lib wraps the PubSub REST API (and RPCs). If that wrapper is used, there is no control on:

  • How many containers are started on one VM; Multiple containers may be started if the CPU is not yet fully utilized
  • How many messages are pulled from the subscription at once (buffering); no control at all
  • How many threads, for processing the pulled messages, are started inside on container; flow_control(max_messages) has no effect if the value is below a fixed value.

The side effects we observed are:

  1. One consumer pulls a high number of messages at once (approximately 100 to 1.000) and queues them in its client buffer. Therefore all other VM's that are started according to the auto-scaling rule, do not receive any message, because all messages are in the queue of the first few VM's
  2. Messages are re-delivered either to the same VM or any other VM (or docker container) if it runs into the acknowledgement deadline. Therefore you need to modify the acknowledgement deadline while processing the message. The deadline counter starts when the processing starts.
  3. Assuming that the processing of the message is a long running task (for instance machine learning), you may
    • Acknowledge the message upfront, but this will cause the VM to be shut down by the auto-scaling rule if there is no further message waiting. The rule does not care if the CPU utilization is still strong and the processing has not yet been finished.
    • Acknowledge the message after processing. In this case you need to modify the acknowledgement deadline of that specific message while processing that message. There must not be one single code block that breaks the deadline since the last modification.

Possible solutions that haven't been looked into:

  • Using the Java Client Library since it comes with better controls on pulling and consuming messages
  • Using the underlying API calls and classes of the Python Client Library
  • Building a synchronization storage that orchestrates the competing consumers
like image 683
Matthias Avatar asked Nov 07 '22 08:11

Matthias


1 Answers

I think there are two main ways you can deal with this.

1) Instead of pushing to your worker processes directly, push to a load balancer.

or

2) Have your worker processes pull the requests rather than pushing them to the workers.

See the "Load Balancing" section under "Push and Pull delivery" in

https://cloud.google.com/pubsub/docs/subscriber

like image 123
Matt Wilbert Avatar answered Nov 14 '22 22:11

Matt Wilbert