Logo Questions Linux Laravel Mysql Ubuntu Git Menu

How to properly use ctx.Done() when client is disconnected?





If client will be disconnected by network error, server must close in my case pub/sub connection. I know about ctx.Done() function, but don't know how to use it properly in my case. Can somebody explain please?

grpc-go: 1.7.0

go version go1.8.4

func (a *API) Notifications(in *empty.Empty, stream pb.Service_NotificationsServer) error {
    ctx := stream.Context()
    _, ok := user.FromContext(ctx)
    if !ok {
        return grpc.Errorf(codes.Unauthenticated, "user not found")

    pubsub := a.redisClient.Subscribe("notifications")
    defer pubsub.Close()

    for {
        msg, err := pubsub.ReceiveMessage()
        if err != nil {
            grpclog.Warningf("Notifications: pubsub error: %v", err)
            return grpc.Errorf(codes.Internal, "pubsub error %v", err)

        notification := &pb.Notification{}
        err = json.Unmarshal([]byte(msg.Payload), notification)
        if err != nil {
            grpclog.Warningf("Notifications: parse error: %v", err)
        if err := stream.Send(notification); err != nil {
            grpclog.Warningf("Notifications: %v", err)
            return err
        grpclog.Infof("Notifications: send msg %v", notification)
like image 903
pendolf Avatar asked Oct 30 '22 00:10


1 Answers

You can use select. instead of normal getting data from a function, use a channel to get data and a go routine to handle it. Some thing like this :

func (a *API) Notifications(in *empty.Empty, stream 
    pb.Service_NotificationsServer) error {
    ctx := stream.Context()
    _, ok := user.FromContext(ctx)
    if !ok {
        return grpc.Errorf(codes.Unauthenticated, "user not found")

    pubsub := a.redisClient.Subscribe("notifications")
    defer pubsub.Close()

    // I can not build the code, so I assume the msg in your code Message struct
    c := make(chan Message)
    go func() {
        for {
            msg, err := pubsub.ReceiveMessage()
            if err != nil {
                grpclog.Warningf("Notifications: pubsub error: %v", err)
                return grpc.Errorf(codes.Internal, "pubsub error %v", err)
            c<- msg

    for {
        select {
            case msg, ok  := <-c:
                if !ok {
                    // channel is closed handle it
                notification := &pb.Notification{}
                err = json.Unmarshal([]byte(msg.Payload), notification)
                if err != nil {
                    grpclog.Warningf("Notifications: parse error: %v", err)
                if err := stream.Send(notification); err != nil {
                    grpclog.Warningf("Notifications: %v", err)
                    return err
                grpclog.Infof("Notifications: send msg %v", notification)
            case <- ctx.Done():
                // do exit logic. some how close the pubsub, so next 
                // ReceiveMessage() return an error
                // if forget to do that the go routine runs for ever 
                // until the end of main(), which I think its not what you wanted
                pubsub.Close() // Its just pseudo code

read the message (I assume the type is Message) from the channel, and use power of select .

two other related thing in this scenario:

  1. Make sure the go routine end after finalizing this function. I can not guess, since I don't know about the code, but I assume there is a Close() method for closing the pubsub so the next ReceiveMessage return an error. (which I see the defer that do the job I hope)

  2. if there is an error in ReceiveMessage before ctx.Done you can close the channel and then break the loop.

like image 177
fzerorubigd Avatar answered Dec 03 '22 23:12
