Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

gRPC: Rendezvous terminated with (StatusCode.INTERNAL, Received RST_STREAM with error code 2)

I'm implementing gRPC client and server in python. Server receives data from client successfully, but client receives back "RST_STREAM with error code 2".

What does it actually mean, and how do I fix it?

Here's my proto file:

service MyApi {
    rpc SelectModelForDataset (Dataset) returns (SelectedModel) {
    }
}
message Dataset {
    // ...
}
message SelectedModel {
    // ...
}

My Services implementation looks like this:

class MyApiServicer(my_api_pb2_grpc.MyApiServicer):
def SelectModelForDataset(self, request, context):
    print("Processing started.")
    selectedModel = ModelSelectionModule.run(request, context)  
    print("Processing Completed.")
    return selectedModel

I start server with this code:

import grpc
from concurrent import futures
#...
server = grpc.server(futures.ThreadPoolExecutor(max_workers=100))
my_api_pb2_grpc.add_MyApiServicer_to_server(MyApiServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()

My client looks like this:

channel = grpc.insecure_channel(target='localhost:50051')
stub = my_api_pb2_grpc.MyApiStub(channel)
dataset = my_api_pb2.Dataset() 
# fill the object ...
model = stub.SelectModelForDataset(dataset)  # call server

After client makes it's call, server starts the processing until completed (takes a minute, approximately), but the client returns immediately with the following error:

Traceback (most recent call last):                                                                   
File "Client.py", line 32, in <module>                                                               
    run()                                                                                            
File "Client.py", line 26, in run                                                                    
    model = stub.SelectModelForDataset(dataset)  # call server                                       
File "/usr/local/lib/python3.5/dist-packages/grpc/_channel.py", line 484, in __call__
    return _end_unary_response_blocking(state, call, False, deadline)                                
File "/usr/local/lib/python3.5/dist-packages/grpc/_channel.py", line 434, in _end_unary_response_blocking                                                                                               
    raise _Rendezvous(state, None, None, deadline)                                                 
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.INTERNAL, Received RST_STREAM with error code 2)>

If I do the request asynchronously and wait on future,

model_future = stub.SelectModelForDataset.future(dataset)  # call server
model = model_future.result()

the client waits until completion, but after that still returns an error:

Traceback (most recent call last):                                                                   
File "AsyncClient.py", line 35, in <module>                                                          
    run()                                                                                            
File "AsyncClient.py", line 29, in run                                                               
    model = model_future.result()                                                                    
File "/usr/local/lib/python3.5/dist-packages/grpc/_channel.py", line 276, in result                  
    raise self                                                                                     
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.INTERNAL, Received RST_STREAM with error code 2)>

UPD: After enabling tracing GRPC_TRACE=all I discovered the following:

Client, immediately after request:

E0109 17:59:42.248727600    1981 channel_connectivity.cc:126] watch_completion_error: {"created":"@1515520782.248638500","description":"GOAWAY received","file":"src/core/ext/transport/chttp2/transport/chttp2_transport.cc","file_line":1137,"http2_error":0,"raw_bytes":"Server shutdown"}            
E0109 17:59:42.451048100    1979 channel_connectivity.cc:126] watch_completion_error: "Cancelled"  
E0109 17:59:42.451160000    1979 completion_queue.cc:659]    Operation failed: tag=0x7f6e5cd1caf8, error={"created":"@1515520782.451034300","description":"Timed out waiting for connection state change","file":"src/core/ext/filters/client_channel/channel_connectivity.cc","file_line":133}
...(last two messages keep repeating 5 times every second)

Server:

E0109 17:59:42.248201000    1985 completion_queue.cc:659]    Operation failed: tag=0x7f3f74febee8, error={"created":"@1515520782.248170000","description":"Server Shutdown","file":"src/core/lib/surface/server.cc","file_line":1249}                                                                    
E0109 17:59:42.248541100    1975 tcp_server_posix.cc:231]    Failed accept4: Invalid argument                                                                             
E0109 17:59:47.362868700    1994 completion_queue.cc:659]    Operation failed: tag=0x7f3f74febee8, error={"created":"@1515520787.362853500","description":"Server Shutdown","file":"src/core/lib/surface/server.cc","file_line":1249}                                                                                                                                             
E0109 17:59:52.430612500    2000 completion_queue.cc:659]    Operation failed: tag=0x7f3f74febee8, error={"created":"@1515520792.430598800","description":"Server Shutdown","file":"src/core/lib/surface/server.cc","file_line":1249}
... (last message kept repeating every few seconds)                                                             

UPD2:

The full content of my Server.py file:

import ModelSelectionModule
import my_api_pb2_grpc
import my_api_pb2
import grpc
from concurrent import futures
import time

class MyApiServicer(my_api_pb2_grpc.MyApiServicer):
    def SelectModelForDataset(self, request, context):
        print("Processing started.")
        selectedModel = ModelSelectionModule.run(request, context)
        print("Processing Completed.")
        return selectedModel


# TODO(shalamov): what is the best way to run a python server?
def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=100))
    my_api_pb2_grpc.add_MyApiServicer_to_server(MyApiServicer(), server)
    server.add_insecure_port('[::]:50051')
    server.start()

    print("gRPC server started\n")
    try:
        while True:
            time.sleep(24 * 60 * 60)  # run for 24h
    except KeyboardInterrupt:
        server.stop(0)


if __name__ == '__main__':
    serve()

UPD3: Seems, that ModelSelectionModule.run is causing the problem. I tried to isolate it into a separate thread, but it didn't help. The selectedModel is eventually calculated, but the client is already gone at that time. How do I prevent this call from messing with grpc?

pool = ThreadPool(processes=1)
async_result = pool.apply_async(ModelSelectionModule.run(request, context))
selectedModel = async_result.get()

The call is rather complicated, it spawns and joins lots of threads, calls different libraries like scikit-learn and smac and other. It'll be too much if I post all of it here.

While debugging, I discovered that after client's request, the server keeps 2 connections open (fd 3 and fd 8). If I manually close fd 8 or write some bytes to it, the error I see in the Client becomes Stream removed (instead of Received RST_STREAM with error code 2). Seems, that the socket (fd 8) somehow becomes corrupted by child processes. How is it possible? How can I protect the socket from being accessed by child processes?

like image 291
Viacheslav Shalamov Avatar asked Jan 09 '18 18:01

Viacheslav Shalamov


1 Answers

This is a result of using fork() in the process handler. gRPC Python doesn't support this use case.

like image 105
kpayson64 Avatar answered Sep 28 '22 05:09

kpayson64