In my scenario I'm scheduling tasks using PubSub. This is up to 2.000 PubSub messages that are than consumed by a Python script that runs inside a Docker Container within Google Compute Engine. That script consumes the PubSub messages.
The processing of each message is about 30seconds to 5min. Therefore the acknowledgement deadline is 600sec (10min).
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.message import Message
def handle_message(message: Message):
# do your stuff here (max. 600sec)
message.ack()
return
def receive_messages(project, subscription_name):
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project, subscription_name)
flow_control = pubsub_v1.types.FlowControl(max_messages=5)
subscription = subscriber.subscribe(subscription_path, flow_control=flow_control)
future = subscription.open(handle_message)
# Blocks the thread while messages are coming in through the stream. Any
# exceptions that crop up on the thread will be set on the future.
# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
try:
future.result()
except Exception as e:
# do some logging
raise
Because I'm working on so many PubSub messages, I'm creating a template for a Compute Engine that uses auto-scaling in either of these two ways:
gcloud compute instance-groups managed create my-worker-group \
--zone=europe-west3-a \
--template=my-worker-template \
--size=0
gcloud beta compute instance-groups managed set-autoscaling my-worker-group \
--zone=europe-west3-a \
--max-num-replicas=50 \
--min-num-replicas=0 \
--target-cpu-utilization=0.4
gcloud beta compute instance-groups managed set-autoscaling my-worker-group \
--zone=europe-west3-a \
--max-num-replicas=50 \
--min-num-replicas=0 \
--update-stackdriver-metric=pubsub.googleapis.com/subscription/num_undelivered_messages \
--stackdriver-metric-filter="resource.type = pubsub_subscription AND resource.label.subscription_id = my-pubsub-subscription" \
--stackdriver-metric-single-instance-assignment=10
So far, so good. Option one scales up to about 8 instances while the second option will start the maximum number of instances. Now I figured out that some strange things happen and this is why I'm posting here. Maybe you can help me out?!
Message duplicates: It seems that the PubSub service in each instance (Python script inside the docker container within compute engine) reads a batch of messages (~10) in somewhat like a buffer and gives them to my code. Looks like all instances that spin up at the same time will read all the same messages (the first 10 of 2.000) and will start working on the same stuff. In my logs I see that most messages are processed 3 times by different machines. I was expecting that PubSub knows if some subscriber buffered 10 messages so that another subscriber will buffer 10 different messages and not the same ones.
Acknowledgement deadline: Because of the buffering the messages that come in the end of the buffer (let's say message 8 or 9) had to wait in the buffer until the preceding messages (messages 1 to 7) have been processed. The sum of that waiting time plus its own processing time may run into the timeout of 600sec.
Load-Balancing: Because each machine buffers so many messages, the load is consumed by just a few instances while other instances are completely idle. This happens for the scaling-option two that uses the PubSub stackdriver metric.
People told me that I need to implement a manual synchronization service using Cloud SQL or something else in which each instance indicates on which message it is working, so that other instances won't start the same. But I feel that can't be true - because then I don't get the idea what PubSub is all about.
Update: I found a nice explanation by Gregor Hohpe, co-author of the book Enterprise Integration Patterns from 2015. Actually my observation was wrong, but the observed side effects are real.
Google Cloud Pub/Sub API actually implements both the Publish-Subscribe Channel and the Competing Consumers patterns. At the heart of the Cloud Pub/Sub is a classic Publish-Subscribe Channel, which delivers a single message published to it to multiple subscribers. One advantage of this pattern is that adding subscribers is side-effect free, which is one reason a Publish-Subscribe Channel is sometimes considered more loosely coupled than a Point-to-Point Channel, which delivers a message to exactly one subscriber. Adding consumers to a Point-to-Point Channel results in Competing Consumers and thus has a strong side effect.
The side effects I observed are about the message buffering and message flow control in each of the subscribers (who subscribed to the same subscription, point-to-point == competing consumers). The current version of the Python Client Lib wraps the PubSub REST API (and RPCs). If that wrapper is used, there is no control on:
The side effects we observed are:
Possible solutions that haven't been looked into:
I think there are two main ways you can deal with this.
1) Instead of pushing to your worker processes directly, push to a load balancer.
or
2) Have your worker processes pull the requests rather than pushing them to the workers.
See the "Load Balancing" section under "Push and Pull delivery" in
https://cloud.google.com/pubsub/docs/subscriber
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With