I am trying to establish a long running Pull subscription to a Google Cloud PubSub topic. I am using a code very similar to the example given in the documentation here, i.e.:
def receive_messages(project, subscription_name):
"""Receives messages from a pull subscription."""
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
def callback(message):
print('Received message: {}'.format(message))
message.ack()
subscriber.subscribe(subscription_path, callback=callback)
# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)
The problem is that I'm receiving the following traceback sometimes:
Exception in thread Consumer helper: consume bidirectional stream:
Traceback (most recent call last):
File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/usr/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "/path/to/google/cloud/pubsub_v1/subscriber/_consumer.py", line 248, in _blocking_consume
self._policy.on_exception(exc)
File "/path/to/google/cloud/pubsub_v1/subscriber/policy/thread.py", line 135, in on_exception
raise exception
File "/path/to/google/cloud/pubsub_v1/subscriber/_consumer.py", line 234, in _blocking_consume
for response in response_generator:
File "/path/to/grpc/_channel.py", line 348, in __next__
return self._next()
File "/path/to/grpc/_channel.py", line 342, in _next
raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>
I saw that this was referenced in another question but here I am asking to how to handle it properly in Python. I have tried to wrap the request in an exception but it seems to run in the background and I am not able to retry in case of that error.
To use Cloud Pub/Sub with your app, you need to create a Google Cloud Platform project that has the Pub/Sub API enabled. Create a new project in the Cloud Platform Console. In the left pane of the console, select Pub/Sub and then select Enable API.
To publish and receive messages from Pub/Sub in Python, you need to create a service account with the “Pub/Sub Publisher” and “Pub/Sub Subscriber” roles: After the service account is created, remember to create a JSON key for it, which will be used for authentication in our Python code later.
There is no limit on the number of retained messages. If subscribers don't use a subscription, the subscription expires. The default expiration period is 31 days.
Pubsub is not compliant with AMQP. You have product with similar propocol, like MQTT for IoT core product, but when a message is in PubSub, only HTTP and gRPC communication are supported.
A somewhat hacky approach that is working for me is a custom policy_class. The default one has an on_exception
function that ignores DEADLINE_EXCEEDED
. You can make a class that inherits the default and also ignores UNAVAILABLE
. Mine looks like this:
from google.cloud import pubsub
from google.cloud.pubsub_v1.subscriber.policy import thread
import grpc
class AvailablePolicy(thread.Policy):
def on_exception(self, exception):
"""The parent ignores DEADLINE_EXCEEDED. Let's also ignore UNAVAILABLE.
I'm not sure what triggers that error, but if you ignore it, your
subscriber seems to work just fine. It's probably an intermittent
thing and it reconnects later if you just give it a chance.
"""
# If this is UNAVAILABLE, then we want to retry.
# That entails just returning None.
unavailable = grpc.StatusCode.UNAVAILABLE
if getattr(exception, 'code', lambda: None)() == unavailable:
return
# For anything else, fallback on super.
super(AvailablePolicy, self).on_exception(exception)
subscriber = pubsub.SubscriberClient(policy_class=AvailablePolicy)
# Continue to set up as normal.
It looks a lot like the original on_exception
just ignores a different error. If you want, you can add some logging whenever the exception is thrown and verify that everything still works. Future messages will still come through.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With