I instantiate one grpc server per subprocess by multiprocess pool. When I use multiple clients to access the server, I found the following two problems:
By the way, my development environment is :
[OS]
ProductName: Mac OS X
ProductVersion: 10.14.6
BuildVersion: 18G5033
[packages]
grpcio = '1.30.0'
grpcio-tools = '1.30.0'
multiprocess = "0.70.10"
grpcio-status = "1.30.0"
googleapis-common-protos = "1.52.0"
[requires]
python_version = "3.8.3"
Here is the server output:
[PID 83287] Binding to 'localhost:52909'
[PID 83288] Starting new server.
[PID 83289] Starting new server.
[PID 83290] Starting new server.
[PID 83291] Starting new server.
[PID 83292] Starting new server.
[PID 83293] Starting new server.
[PID 83294] Starting new server.
[PID 83295] Starting new server.
[PID 83295] Determining primality of 2
[PID 83295] Determining primality of 9
[PID 83295] Determining primality of 23
[PID 83295] Determining primality of 16
[PID 83295] Determining primality of 10
[PID 83295] Determining primality of 3
[PID 83295] Determining primality of 24
[PID 83295] Determining primality of 17
[PID 83295] Determining primality of 11
[PID 83295] Determining primality of 25
[PID 83295] Determining primality of 4
[PID 83295] Determining primality of 18
[PID 83295] Determining primality of 5
[PID 83295] Determining primality of 12
[PID 83295] Determining primality of 19
[PID 83295] Determining primality of 26
[PID 83295] Determining primality of 6
[PID 83295] Determining primality of 13
[PID 83295] Determining primality of 27
[PID 83295] Determining primality of 20
[PID 83295] Determining primality of 7
[PID 83295] Determining primality of 14
[PID 83295] Determining primality of 8
[PID 83295] Determining primality of 28
[PID 83295] Determining primality of 15
As seen above,all clients access same server subprocess[PID 83295]
. Why?
Here is the client error info:
Traceback (most recent call last):
File "/Users/zhaolong/PycharmProjects/pipEnvGrpc/grpc_multi/client.py", line 96, in <module>
main()
File "/Users/zhaolong/PycharmProjects/pipEnvGrpc/grpc_multi/client.py", line 86, in main
primes = _calculate_primes(args.server_address)
File "/Users/zhaolong/PycharmProjects/pipEnvGrpc/grpc_multi/client.py", line 71, in _calculate_primes
primality = worker_pool.map(_run_worker_query, check_range)
File "/Users/zhaolong/.local/share/virtualenvs/pipEnvGrpc-7cHuZ_0E/lib/python3.8/site-packages/multiprocess/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/Users/zhaolong/.local/share/virtualenvs/pipEnvGrpc-7cHuZ_0E/lib/python3.8/site-packages/multiprocess/pool.py", line 771, in get
raise self._value
multiprocess.pool.MaybeEncodingError: Error sending result: '[isPrime: true
, , isPrime: true
, , isPrime: true
, , ]'. Reason: 'PicklingError("Can't pickle <class 'prime_pb2.Primality'>: it's not the same object as prime_pb2.Primality")'
As seen above,it raises multiprocess.pool.MaybeEncodingError
and PicklingError
. Why?
Below is the full code:
// the prime.proto from https://github.com/grpc/grpc/tree/v1.30.0/examples/python/multiprocessing
syntax = "proto3";
package prime;
// A candidate integer for primality testing.
message PrimeCandidate {
// The candidate.
int64 candidate = 1;
}
// The primality of the requested integer candidate.
message Primality {
// Is the candidate prime?
bool isPrime = 1;
}
// Service to check primality.
service PrimeChecker {
// Determines the primality of an integer.
rpc check (PrimeCandidate) returns (Primality) {}
}
# the server.py from https://github.com/grpc/grpc/tree/v1.30.0/examples/python/multiprocessing,
# and I have modify some palces.
"""An example of multiprocess concurrency with gRPC."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import multiprocessing
from concurrent import futures
import contextlib
import datetime
import logging
import math
import time
import socket
import sys
from multiprocess import pool
import grpc
from grpc_multi import prime_pb2
from grpc_multi import prime_pb2_grpc
_LOGGER = logging.getLogger(__name__)
_ONE_DAY = datetime.timedelta(days=1)
_PROCESS_COUNT = multiprocessing.cpu_count()
_THREAD_CONCURRENCY = _PROCESS_COUNT
def is_prime(n):
for i in range(2, int(math.ceil(math.sqrt(n)))):
if n % i == 0:
return False
else:
return True
class PrimeChecker(prime_pb2_grpc.PrimeCheckerServicer):
def check(self, request, context):
_LOGGER.info('Determining primality of %s', request.candidate)
return prime_pb2.Primality(isPrime=is_prime(request.candidate))
def _wait_forever(server):
try:
while True:
time.sleep(_ONE_DAY.total_seconds())
except KeyboardInterrupt:
server.stop(None)
def _run_server(bind_address):
"""Start a server in a subprocess."""
_LOGGER.info('Starting new server.')
options = (('grpc.so_reuseport', 1),)
server = grpc.server(futures.ThreadPoolExecutor(
max_workers=_THREAD_CONCURRENCY,),
options=options)
prime_pb2_grpc.add_PrimeCheckerServicer_to_server(PrimeChecker(), server)
server.add_insecure_port(bind_address)
server.start()
# _wait_forever(server)
server.wait_for_termination()
@contextlib.contextmanager
def _reserve_port():
"""Find and reserve a port for all subprocesses to use."""
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0:
raise RuntimeError("Failed to set SO_REUSEPORT.")
sock.bind(('', 0))
try:
yield sock.getsockname()[1]
finally:
sock.close()
def main():
with _reserve_port() as port:
bind_address = 'localhost:{}'.format(port)
_LOGGER.info("Binding to '%s'", bind_address)
sys.stdout.flush()
addrs = [bind_address for _ in range(_PROCESS_COUNT)]
server_pool = pool.Pool(processes=_PROCESS_COUNT)
server_pool.map(_run_server, addrs)
server_pool.close()
# server_pool.join()
if __name__ == '__main__':
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('[PID %(process)d] %(message)s')
handler.setFormatter(formatter)
_LOGGER.addHandler(handler)
_LOGGER.setLevel(logging.INFO)
main()
# the client.py from https://github.com/grpc/grpc/tree/v1.30.0/examples/python/multiprocessing,
# and I have modify some palces.
"""An example of multiprocessing concurrency with gRPC."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import atexit
import logging
from multiprocess import pool
import operator
import sys
import grpc
from grpc_multi import prime_pb2
from grpc_multi import prime_pb2_grpc
_PROCESS_COUNT = 4
_MAXIMUM_CANDIDATE = 100
# Each worker process initializes a single channel after forking.
# It's regrettable, but to ensure that each subprocess only has to instantiate
# a single channel to be reused across all RPCs, we use globals.
_worker_channel_singleton = None
_worker_stub_singleton = None
_LOGGER = logging.getLogger(__name__)
def _shutdown_worker():
_LOGGER.info('Shutting worker process down.')
if _worker_channel_singleton is not None:
_worker_channel_singleton.close()
def _initialize_worker(server_address):
global _worker_channel_singleton # pylint: disable=global-statement
global _worker_stub_singleton # pylint: disable=global-statement
_LOGGER.info('Initializing worker process.')
_worker_channel_singleton = grpc.insecure_channel(server_address)
_worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub(
_worker_channel_singleton)
atexit.register(_shutdown_worker)
def _run_worker_query(primality_candidate):
_LOGGER.info('Checking primality of %s.', primality_candidate)
return _worker_stub_singleton.check(
prime_pb2.PrimeCandidate(candidate=primality_candidate))
def _calculate_primes(server_address):
worker_pool = pool.Pool(processes=_PROCESS_COUNT,
initializer=_initialize_worker,
initargs=(server_address,))
check_range = range(2, _MAXIMUM_CANDIDATE)
primality = worker_pool.map(_run_worker_query, check_range)
worker_pool.close()
worker_pool.join()
primes = zip(check_range, map(operator.attrgetter('isPrime'), primality))
return tuple(primes)
def main():
msg = 'Determine the primality of the first {} integers.'.format(
_MAXIMUM_CANDIDATE)
parser = argparse.ArgumentParser(description=msg)
parser.add_argument('--server_address',
default='localhost:52909',
help='The address of the server (e.g. localhost:50051)')
args = parser.parse_args()
primes = _calculate_primes(args.server_address)
print(primes)
if __name__ == '__main__':
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('[PID %(process)d] %(message)s')
handler.setFormatter(formatter)
_LOGGER.addHandler(handler)
_LOGGER.setLevel(logging.INFO)
main()
I have tried multiple channels for client according to Lidi Zheng's advice, but it seems still not work, Is my method wrong?
def _run_worker_query(param):
server_address, primality_candidate = param
worker_channel_singleton = grpc.insecure_channel(server_address)
worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub(worker_channel_singleton)
_LOGGER.info('Checking primality of %s.', primality_candidate)
res = worker_stub_singleton.check(
prime_pb2.PrimeCandidate(candidate=primality_candidate))
return res.isPrime
def _calculate_primes(server_address):
# worker_pool = pool.Pool(processes=_PROCESS_COUNT,
# initializer=_initialize_worker,
# initargs=(server_address,))
worker_pool = pool.Pool(processes=_PROCESS_COUNT)
check_range = range(2, _MAXIMUM_CANDIDATE)
params = [(server_address, r) for r in check_range]
primality = worker_pool.map(_run_worker_query, params)
worker_pool.close()
worker_pool.join()
# primes = zip(check_range, map(operator.attrgetter('isPrime'), primality))
primes = zip(check_range, primality)
return tuple(primes)
server output is :
[PID 43279] Determining primality of 3
[PID 43279] Determining primality of 2
[PID 43279] Determining primality of 4
[PID 43279] Determining primality of 5
[PID 43279] Determining primality of 6
[PID 43279] Determining primality of 7
[PID 43279] Determining primality of 8
[PID 43279] Determining primality of 9
Thanks for providing the detailed reproduction case. I'm able to reproduce the first issue, but not the second one.
The usage of multiprocessing gRPC Python server is correct. Before diving into the details, I want to repeat the tricky issue about multiprocessing gRPC Python server. gRPC Python server doesn't support fork after starting. To enable multiprocessing in gRPC, the application needs to fork early (see example-multiprocessing).
About the first issue, the cause is that the client only created one channel. The _initialize_worker
uses a global variable _worker_channel_singleton
to store channel, which means there is only one channel despite the existence of multiple processes. One gRPC channel means one TCP connection. The automatic load balancing by SO_REUSEPORT only work at the granularity of TCP connections. Give multiple channels a try, it should solve your issue. (Side note: to load balance the requests from one single gRPC channel, one can use a L7 load balancer, e.g., L7 ILB).
About the second issue, protobuf messages are not pickle-able. To pass protobuf messages across processes, the client needs to serialize them to string and deserialize at the other side. This is due to the nature of protobuf, that a serialized message can map to many (if not infinite) possible original messages. Without a schema, it is impossible to decode.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With