If client will be disconnected by network error, server must close in my case pub/sub connection. I know about ctx.Done()
function, but don't know how to use it properly in my case. Can somebody explain please?
grpc-go: 1.7.0
go version go1.8.4
func (a *API) Notifications(in *empty.Empty, stream pb.Service_NotificationsServer) error {
ctx := stream.Context()
_, ok := user.FromContext(ctx)
if !ok {
return grpc.Errorf(codes.Unauthenticated, "user not found")
}
pubsub := a.redisClient.Subscribe("notifications")
defer pubsub.Close()
for {
msg, err := pubsub.ReceiveMessage()
if err != nil {
grpclog.Warningf("Notifications: pubsub error: %v", err)
return grpc.Errorf(codes.Internal, "pubsub error %v", err)
}
notification := &pb.Notification{}
err = json.Unmarshal([]byte(msg.Payload), notification)
if err != nil {
grpclog.Warningf("Notifications: parse error: %v", err)
continue
}
if err := stream.Send(notification); err != nil {
grpclog.Warningf("Notifications: %v", err)
return err
}
grpclog.Infof("Notifications: send msg %v", notification)
}
}
You can use select
. instead of normal getting data from a function, use a channel to get data and a go routine to handle it.
Some thing like this :
func (a *API) Notifications(in *empty.Empty, stream
pb.Service_NotificationsServer) error {
ctx := stream.Context()
_, ok := user.FromContext(ctx)
if !ok {
return grpc.Errorf(codes.Unauthenticated, "user not found")
}
pubsub := a.redisClient.Subscribe("notifications")
defer pubsub.Close()
// I can not build the code, so I assume the msg in your code Message struct
c := make(chan Message)
go func() {
for {
msg, err := pubsub.ReceiveMessage()
if err != nil {
grpclog.Warningf("Notifications: pubsub error: %v", err)
close(c)
return grpc.Errorf(codes.Internal, "pubsub error %v", err)
}
c<- msg
}
}()
for {
select {
case msg, ok := <-c:
if !ok {
// channel is closed handle it
}
notification := &pb.Notification{}
err = json.Unmarshal([]byte(msg.Payload), notification)
if err != nil {
grpclog.Warningf("Notifications: parse error: %v", err)
continue
}
if err := stream.Send(notification); err != nil {
grpclog.Warningf("Notifications: %v", err)
return err
}
grpclog.Infof("Notifications: send msg %v", notification)
case <- ctx.Done():
// do exit logic. some how close the pubsub, so next
// ReceiveMessage() return an error
// if forget to do that the go routine runs for ever
// until the end of main(), which I think its not what you wanted
pubsub.Close() // Its just pseudo code
return
}
}
}
read the message (I assume the type is Message) from the channel, and use power of select
.
two other related thing in this scenario:
Make sure the go routine end after finalizing this function. I can not guess, since I don't know about the code, but I assume there is a Close()
method for closing the pubsub
so the next ReceiveMessage
return an error. (which I see the defer that do the job I hope)
if there is an error in ReceiveMessage
before ctx.Done
you can close the channel and then break the loop.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With