Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to properly use ctx.Done() when client is disconnected?

Tags:

go

grpc

channel

If client will be disconnected by network error, server must close in my case pub/sub connection. I know about ctx.Done() function, but don't know how to use it properly in my case. Can somebody explain please?

grpc-go: 1.7.0

go version go1.8.4

func (a *API) Notifications(in *empty.Empty, stream pb.Service_NotificationsServer) error {
    ctx := stream.Context()
    _, ok := user.FromContext(ctx)
    if !ok {
        return grpc.Errorf(codes.Unauthenticated, "user not found")
    }

    pubsub := a.redisClient.Subscribe("notifications")
    defer pubsub.Close()

    for {
        msg, err := pubsub.ReceiveMessage()
        if err != nil {
            grpclog.Warningf("Notifications: pubsub error: %v", err)
            return grpc.Errorf(codes.Internal, "pubsub error %v", err)
        }

        notification := &pb.Notification{}
        err = json.Unmarshal([]byte(msg.Payload), notification)
        if err != nil {
            grpclog.Warningf("Notifications: parse error: %v", err)
            continue
        }
        if err := stream.Send(notification); err != nil {
            grpclog.Warningf("Notifications: %v", err)
            return err
        }
        grpclog.Infof("Notifications: send msg %v", notification)
    }
}
like image 903
pendolf Avatar asked Oct 30 '22 00:10

pendolf


1 Answers

You can use select. instead of normal getting data from a function, use a channel to get data and a go routine to handle it. Some thing like this :

func (a *API) Notifications(in *empty.Empty, stream 
    pb.Service_NotificationsServer) error {
    ctx := stream.Context()
    _, ok := user.FromContext(ctx)
    if !ok {
        return grpc.Errorf(codes.Unauthenticated, "user not found")
    }

    pubsub := a.redisClient.Subscribe("notifications")
    defer pubsub.Close()

    // I can not build the code, so I assume the msg in your code Message struct
    c := make(chan Message)
    go func() {
        for {
            msg, err := pubsub.ReceiveMessage()
            if err != nil {
                grpclog.Warningf("Notifications: pubsub error: %v", err)
                close(c)
                return grpc.Errorf(codes.Internal, "pubsub error %v", err)
            }
            c<- msg
        }
    }()

    for {
        select {
            case msg, ok  := <-c:
                if !ok {
                    // channel is closed handle it
                }
                notification := &pb.Notification{}
                err = json.Unmarshal([]byte(msg.Payload), notification)
                if err != nil {
                    grpclog.Warningf("Notifications: parse error: %v", err)
                    continue
                }
                if err := stream.Send(notification); err != nil {
                    grpclog.Warningf("Notifications: %v", err)
                    return err
                }
                grpclog.Infof("Notifications: send msg %v", notification)
            case <- ctx.Done():
                // do exit logic. some how close the pubsub, so next 
                // ReceiveMessage() return an error
                // if forget to do that the go routine runs for ever 
                // until the end of main(), which I think its not what you wanted
                pubsub.Close() // Its just pseudo code
                return
        }
    }
}

read the message (I assume the type is Message) from the channel, and use power of select .

two other related thing in this scenario:

  1. Make sure the go routine end after finalizing this function. I can not guess, since I don't know about the code, but I assume there is a Close() method for closing the pubsub so the next ReceiveMessage return an error. (which I see the defer that do the job I hope)

  2. if there is an error in ReceiveMessage before ctx.Done you can close the channel and then break the loop.

like image 177
fzerorubigd Avatar answered Dec 03 '22 23:12

fzerorubigd