Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

GRPC async response stream C#

Tags:

c#

grpc

How would I generate streaming response values for an RPC from outside the handler? (specifically, from a IObservable) I'm currently doing the following, but this is creating cross-thread issues because AnRxObservable is shared between RPC handlers...

public override Task GetTicker(RequestProto request, ServerCallContext context)
{
    var subscription = AnRxObservable.Subscribe(value =>
    {
        responseStream.WriteAsync(new ResponseProto
        {
            Value = value
        });
    });

    // Wait for the RPC to be canceled (my extension method
    // that returns a task that completes when the CancellationToken
    // is cancelled)
    await context.CancellationToken.WhenCancelled();

    // Dispose of the buffered stream
    bufferedStream.Dispose();

    // Dispose subscriber (tells rx that we aren't subscribed anymore)
    subscription.Dispose();

    return Task.FromResult(1);
}

This code doesn't feel right... but I can't see any other way of streaming RPC responses from a shared source created outside the RPC handler.

like image 287
Warrick Avatar asked Jul 01 '17 09:07

Warrick


1 Answers

Generally speaking, when you are trying to convert from push model (IObservable) into pull model (enumerating the responses to write and writing them), you need an intermediate buffer for the message - e.g. a blockingQueue. The handler body can then be an async loop that tries to fetch the next message for the queue (preferably in an async fashion) and writes it to the responseStream.

Also, be aware that gRPC API only allows you to have 1 in-flight response at any given time - and your snippet doesn't respect that. So you need to await the WriteAsync() before starting another write (an that's another reason why you need an intermediate queue).

This link might be useful in explaining the push vs pull paradigms: When to use IEnumerable vs IObservable?

like image 189
Jan Tattermusch Avatar answered Sep 28 '22 21:09

Jan Tattermusch