Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why doesn't IServerStreamWriter send notification responses?

I work on a cross-platform application. For connection between them, I use gRPC technology. When a client connects to server, it is added to an observers list located in server implementation. When a client connects, I want to send a message to the rest of connected clients telling them that a new client connected. The problem is that when I want to send a response to clients that a new client connected, using the observers from my list, I get the following exception:

Grpc.Core.RpcException: 'Status(StatusCode=Unknown, Detail="Exception was thrown by handler.")'

This is my proto file where I declared my server:

syntax = "proto3";

package com.example.grpc.chat;

message ChatMessage {
    string from = 1;
    string message = 2;
}

message ChatMessageFromServer {
    ChatMessage message = 2;
}

service ChatService {
    rpc Login(ChatMessage ) returns (stream ChatMessageFromServer);
}

The server code :

 public class ChatServiceImpl : ChatService.ChatServiceBase
    {
        private static HashSet<IServerStreamWriter<ChatMessageFromServer>> responseStreams = new HashSet<IServerStreamWriter<ChatMessageFromServer>>();

        /*
         * if the stream object (from "for" statement inside this method) isn't the responseStream object given in the list with parameters,
         * the rest of clients aren't notified when a new login request is pushed.
         */
        public override async Task Login(global::Com.Example.Grpc.Chat.ChatMessage request,
            IServerStreamWriter<global::Com.Example.Grpc.Chat.ChatMessageFromServer> responseStream, 
            ServerCallContext context)
        {
            Console.WriteLine("Login method from server");
            responseStreams.Add(responseStream);

            // Create a server message that wraps the client message
            var message = new ChatMessageFromServer
            {
                Message = new ChatMessage
                {
                    From = "login",
                    Message = "hello"
                }
            };
            // If stream variable isn't equal to responseStream from list of parameters, the client corresponding to that stream isn't notified and it's thrown the above exception
            foreach (var stream in responseStreams)
            {
                await stream.WriteAsync(message);
            }
        }

    }

The client code where the client send a login request:

public partial class ChatForm : Form
    {
        private const string Host = "localhost";
        private const int Port = 9090;

        private ChatService.ChatServiceClient _chatService;

        public ChatForm()
        {
            //InitializeComponent();
            InitializeGrpc();
        }

        private void InitializeGrpc()
        {
            // Create a channel
            var channel = new Channel(Host + ":" + Port, ChannelCredentials.Insecure);

            // Create a client with the channel
            _chatService = new ChatService.ChatServiceClient(channel);
        }

        private async void ChatForm_Load(object sender, EventArgs e)
        {

            var message = new ChatMessage
            {
                From = "Unknown",
                Message = "Login text"
            };

            // Open a connection to the server
            try
            {
                using (var call = _chatService.Login(message))
                {
                    // Read messages from the response stream
                    while (await call.ResponseStream.MoveNext(CancellationToken.None))
                    {
                        var serverMessage = call.ResponseStream.Current;
                        var otherClientMessage = serverMessage.Message;
                        var displayMessage = string.Format("{0}:{1}{2}", otherClientMessage.From, otherClientMessage.Message, Environment.NewLine);
                        chatTextBox.Text += displayMessage;
                    }

                }
            }
            catch (RpcException )
            {

                throw;
            }
        }
    }
like image 640
Catalin Horia Avatar asked Jun 06 '26 02:06

Catalin Horia


1 Answers

Your notifyObservers method is asynchronous but has a void return type, which means you can't await it. You're effectively starting the method, and returning as soon as you hit the first await operator that uses an incomplete awaitable (the first WriteAsync call, probably).

You then return the task with a ReservationResponse, and the operation completes.

When that first awaitable call completes, notifyObservers will continue, but at that point the operation has already completed, so when you try to write to the response stream, the system will throw the error you're seeing.

I strongly suspect you should return a Task from notifyObservers and await that from your main entry method:

// Names changed to be conventional C#
public override async Task<ReservationResponse> SaveReservation(
    global::Res.Protocol.ReservationRequest request, ServerCallContext context)
{
    // some code for saving my reservation in repository database
    ReservationResponse response = new Res.Protocol.ReservationResponse
    {
        Type = ReservationResponse.Types.Type.Savereservation,
        Journey = GetProtoJourney(journey)
    };

    await NotifyObserversAsync(response);

    // Note: no Task.FromResult, as you're in an async method. The response
    // will already be wrapped in a task.
    return new ReservationResponse
    {
        Type = ReservationResponse.Types.Type.Savereservation
    };
}

public async Task NotifyObserversAsync(Res.Protocol.ReservationResponse response)
{
    foreach (var ob in responseStreams)
    {
        await ob.WriteAsync(response);
    }
}
like image 168
Jon Skeet Avatar answered Jun 08 '26 16:06

Jon Skeet