I am using the java library to subscribe to a subscription from my code. Using sbt:
"com.google.cloud" % "google-cloud-pubsub" % "0.24.0-beta"
I followed this guide to write a subscriber: https://cloud.google.com/pubsub/docs/pull
val projectId = "test-topic"
val subscriptionId = "test-sub"
def main(args: Array[String]): Unit = {
val subscriptionName = SubscriptionName.create(projectId, subscriptionId)
val subscriber = Subscriber.defaultBuilder(subscriptionName, new PastEventMessageReceiver()).build()
subscriber.startAsync()
System.in.read()
}
class PastEventMessageReceiver extends MessageReceiver {
override def receiveMessage(message: PubsubMessage, consumer: AckReplyConsumer): Unit = {
println(message)
consumer.ack()
}
It works great, I am able to pull published messaged but I keep seeing this error in my log multiple times per minute.
com.google.cloud.pubsub.v1.StreamingSubscriberConnection$1 onFailure
WARNING: Terminated streaming with exception
io.grpc.StatusRuntimeException: UNAVAILABLE: The service was unable to fulfill your request. Please try again. [code=8a75]
at io.grpc.Status.asRuntimeException(Status.java:526)
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:385)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:422)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:61)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:504)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$600(ClientCallImpl.java:425)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:536)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:102)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
When I first run the app there is a small delay (around 1-2 mins) where I don't see that error, after the delay I see it multiple times per minute. My application seems to still able to pull messages tough.
To use Cloud Pub/Sub with your app, you need to create a Google Cloud Platform project that has the Pub/Sub API enabled. Create a new project in the Cloud Platform Console. In the left pane of the console, select Pub/Sub and then select Enable API.
There is no limit on the number of retained messages.
For Pub/Sub, the cost is $2000 per month. For Pub/Sub Lite with zonal topics, the cost is $169 per month. For Pub/Sub Lite with regional topics, the cost is $608 per month.
This message is an internal error in the Google Cloud Pub/Sub library that happens when there is a disconnection or retryable error in a request sent to the Pub/Sub server. The client library should seamlessly recreate connections and retry requests on these errors. In version 0.26.0-beta and later of the client library, these errors should no longer print out in the log unless you have the log level set to FINE. In general, messages should still continue to be sent to your MessageReceiver after this error occurs. Any error that is not retryable by the client library itself, e.g., when a subscription is not found, is propagated back to the caller.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With