Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why multiprocess python grpc server do not work?

I instantiate one grpc server per subprocess by multiprocess pool. When I use multiple clients to access the server, I found the following two problems:

  • all clients access same server subprocess
  • client raise MaybeEncodingError

By the way, my development environment is :

[OS]
ProductName:    Mac OS X
ProductVersion: 10.14.6
BuildVersion:   18G5033

[packages]
grpcio = '1.30.0'
grpcio-tools = '1.30.0'
multiprocess = "0.70.10"
grpcio-status = "1.30.0"
googleapis-common-protos = "1.52.0"

[requires]
python_version = "3.8.3"

Here is the server output:

[PID 83287] Binding to 'localhost:52909'
[PID 83288] Starting new server.
[PID 83289] Starting new server.
[PID 83290] Starting new server.
[PID 83291] Starting new server.
[PID 83292] Starting new server.
[PID 83293] Starting new server.
[PID 83294] Starting new server.
[PID 83295] Starting new server.
[PID 83295] Determining primality of 2
[PID 83295] Determining primality of 9
[PID 83295] Determining primality of 23
[PID 83295] Determining primality of 16
[PID 83295] Determining primality of 10
[PID 83295] Determining primality of 3
[PID 83295] Determining primality of 24
[PID 83295] Determining primality of 17
[PID 83295] Determining primality of 11
[PID 83295] Determining primality of 25
[PID 83295] Determining primality of 4
[PID 83295] Determining primality of 18
[PID 83295] Determining primality of 5
[PID 83295] Determining primality of 12
[PID 83295] Determining primality of 19
[PID 83295] Determining primality of 26
[PID 83295] Determining primality of 6
[PID 83295] Determining primality of 13
[PID 83295] Determining primality of 27
[PID 83295] Determining primality of 20
[PID 83295] Determining primality of 7
[PID 83295] Determining primality of 14
[PID 83295] Determining primality of 8
[PID 83295] Determining primality of 28
[PID 83295] Determining primality of 15

As seen above,all clients access same server subprocess[PID 83295]. Why?

Here is the client error info:

Traceback (most recent call last):
  File "/Users/zhaolong/PycharmProjects/pipEnvGrpc/grpc_multi/client.py", line 96, in <module>
    main()
  File "/Users/zhaolong/PycharmProjects/pipEnvGrpc/grpc_multi/client.py", line 86, in main
    primes = _calculate_primes(args.server_address)
  File "/Users/zhaolong/PycharmProjects/pipEnvGrpc/grpc_multi/client.py", line 71, in _calculate_primes
    primality = worker_pool.map(_run_worker_query, check_range)
  File "/Users/zhaolong/.local/share/virtualenvs/pipEnvGrpc-7cHuZ_0E/lib/python3.8/site-packages/multiprocess/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Users/zhaolong/.local/share/virtualenvs/pipEnvGrpc-7cHuZ_0E/lib/python3.8/site-packages/multiprocess/pool.py", line 771, in get
    raise self._value
multiprocess.pool.MaybeEncodingError: Error sending result: '[isPrime: true
, , isPrime: true
, , isPrime: true
, , ]'. Reason: 'PicklingError("Can't pickle <class 'prime_pb2.Primality'>: it's not the same object as prime_pb2.Primality")'

As seen above,it raises multiprocess.pool.MaybeEncodingError and PicklingError. Why?

Below is the full code:

// the prime.proto from https://github.com/grpc/grpc/tree/v1.30.0/examples/python/multiprocessing

syntax = "proto3";

package prime;

// A candidate integer for primality testing.
message PrimeCandidate {
    // The candidate.
    int64 candidate = 1;
}

// The primality of the requested integer candidate.
message Primality {
    // Is the candidate prime?
    bool isPrime = 1;
}

// Service to check primality.
service PrimeChecker {
    // Determines the primality of an integer.
    rpc check (PrimeCandidate) returns (Primality) {}
}
# the server.py from https://github.com/grpc/grpc/tree/v1.30.0/examples/python/multiprocessing,
# and I have modify some palces.


"""An example of multiprocess concurrency with gRPC."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import multiprocessing
from concurrent import futures
import contextlib
import datetime
import logging
import math
import time
import socket
import sys
from multiprocess import pool
import grpc

from grpc_multi import prime_pb2
from grpc_multi import prime_pb2_grpc

_LOGGER = logging.getLogger(__name__)

_ONE_DAY = datetime.timedelta(days=1)
_PROCESS_COUNT = multiprocessing.cpu_count()
_THREAD_CONCURRENCY = _PROCESS_COUNT


def is_prime(n):
    for i in range(2, int(math.ceil(math.sqrt(n)))):
        if n % i == 0:
            return False
    else:
        return True


class PrimeChecker(prime_pb2_grpc.PrimeCheckerServicer):

    def check(self, request, context):
        _LOGGER.info('Determining primality of %s', request.candidate)
        return prime_pb2.Primality(isPrime=is_prime(request.candidate))


def _wait_forever(server):
    try:
        while True:
            time.sleep(_ONE_DAY.total_seconds())
    except KeyboardInterrupt:
        server.stop(None)


def _run_server(bind_address):
    """Start a server in a subprocess."""
    _LOGGER.info('Starting new server.')
    options = (('grpc.so_reuseport', 1),)

    server = grpc.server(futures.ThreadPoolExecutor(
        max_workers=_THREAD_CONCURRENCY,),
                         options=options)
    prime_pb2_grpc.add_PrimeCheckerServicer_to_server(PrimeChecker(), server)
    server.add_insecure_port(bind_address)
    server.start()
    # _wait_forever(server)
    server.wait_for_termination()


@contextlib.contextmanager
def _reserve_port():
    """Find and reserve a port for all subprocesses to use."""
    sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
    if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0:
        raise RuntimeError("Failed to set SO_REUSEPORT.")
    sock.bind(('', 0))
    try:
        yield sock.getsockname()[1]
    finally:
        sock.close()


def main():
    with _reserve_port() as port:
        bind_address = 'localhost:{}'.format(port)
        _LOGGER.info("Binding to '%s'", bind_address)
        sys.stdout.flush()
        addrs = [bind_address for _ in range(_PROCESS_COUNT)]
        server_pool = pool.Pool(processes=_PROCESS_COUNT)
        server_pool.map(_run_server, addrs)
        server_pool.close()
        # server_pool.join()

if __name__ == '__main__':
    handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter('[PID %(process)d] %(message)s')
    handler.setFormatter(formatter)
    _LOGGER.addHandler(handler)
    _LOGGER.setLevel(logging.INFO)
    main()
# the client.py from https://github.com/grpc/grpc/tree/v1.30.0/examples/python/multiprocessing,
# and I have modify some palces.


"""An example of multiprocessing concurrency with gRPC."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import atexit
import logging
from multiprocess import pool
import operator
import sys

import grpc

from grpc_multi import prime_pb2
from grpc_multi import prime_pb2_grpc

_PROCESS_COUNT = 4
_MAXIMUM_CANDIDATE = 100

# Each worker process initializes a single channel after forking.
# It's regrettable, but to ensure that each subprocess only has to instantiate
# a single channel to be reused across all RPCs, we use globals.
_worker_channel_singleton = None
_worker_stub_singleton = None

_LOGGER = logging.getLogger(__name__)


def _shutdown_worker():
    _LOGGER.info('Shutting worker process down.')
    if _worker_channel_singleton is not None:
        _worker_channel_singleton.close()


def _initialize_worker(server_address):
    global _worker_channel_singleton  # pylint: disable=global-statement
    global _worker_stub_singleton  # pylint: disable=global-statement
    _LOGGER.info('Initializing worker process.')
    _worker_channel_singleton = grpc.insecure_channel(server_address)
    _worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub(
        _worker_channel_singleton)
    atexit.register(_shutdown_worker)


def _run_worker_query(primality_candidate):
    _LOGGER.info('Checking primality of %s.', primality_candidate)
    return _worker_stub_singleton.check(
        prime_pb2.PrimeCandidate(candidate=primality_candidate))


def _calculate_primes(server_address):
    worker_pool = pool.Pool(processes=_PROCESS_COUNT,
                                       initializer=_initialize_worker,
                                       initargs=(server_address,))
    check_range = range(2, _MAXIMUM_CANDIDATE)
    primality = worker_pool.map(_run_worker_query, check_range)
    worker_pool.close()
    worker_pool.join()
    primes = zip(check_range, map(operator.attrgetter('isPrime'), primality))
    return tuple(primes)


def main():
    msg = 'Determine the primality of the first {} integers.'.format(
        _MAXIMUM_CANDIDATE)
    parser = argparse.ArgumentParser(description=msg)
    parser.add_argument('--server_address',
                        default='localhost:52909',
                        help='The address of the server (e.g. localhost:50051)')
    args = parser.parse_args()
    primes = _calculate_primes(args.server_address)
    print(primes)


if __name__ == '__main__':
    handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter('[PID %(process)d] %(message)s')
    handler.setFormatter(formatter)
    _LOGGER.addHandler(handler)
    _LOGGER.setLevel(logging.INFO)
    main()

I have tried multiple channels for client according to Lidi Zheng's advice, but it seems still not work, Is my method wrong?

def _run_worker_query(param):
    server_address, primality_candidate = param
    worker_channel_singleton = grpc.insecure_channel(server_address)
    worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub(worker_channel_singleton)
    _LOGGER.info('Checking primality of %s.', primality_candidate)
    res = worker_stub_singleton.check(
        prime_pb2.PrimeCandidate(candidate=primality_candidate))
    return res.isPrime


def _calculate_primes(server_address):
    # worker_pool = pool.Pool(processes=_PROCESS_COUNT,
    #                                    initializer=_initialize_worker,
    #                                    initargs=(server_address,))
    worker_pool = pool.Pool(processes=_PROCESS_COUNT)
    check_range = range(2, _MAXIMUM_CANDIDATE)
    params = [(server_address, r) for r in check_range]
    primality = worker_pool.map(_run_worker_query, params)
    worker_pool.close()
    worker_pool.join()
    # primes = zip(check_range, map(operator.attrgetter('isPrime'), primality))
    primes = zip(check_range, primality)
    return tuple(primes)

server output is :

[PID 43279] Determining primality of 3
[PID 43279] Determining primality of 2
[PID 43279] Determining primality of 4
[PID 43279] Determining primality of 5
[PID 43279] Determining primality of 6
[PID 43279] Determining primality of 7
[PID 43279] Determining primality of 8
[PID 43279] Determining primality of 9
like image 295
Long.zhao Avatar asked Jul 08 '20 15:07

Long.zhao


1 Answers

Thanks for providing the detailed reproduction case. I'm able to reproduce the first issue, but not the second one.

The usage of multiprocessing gRPC Python server is correct. Before diving into the details, I want to repeat the tricky issue about multiprocessing gRPC Python server. gRPC Python server doesn't support fork after starting. To enable multiprocessing in gRPC, the application needs to fork early (see example-multiprocessing).

About the first issue, the cause is that the client only created one channel. The _initialize_worker uses a global variable _worker_channel_singleton to store channel, which means there is only one channel despite the existence of multiple processes. One gRPC channel means one TCP connection. The automatic load balancing by SO_REUSEPORT only work at the granularity of TCP connections. Give multiple channels a try, it should solve your issue. (Side note: to load balance the requests from one single gRPC channel, one can use a L7 load balancer, e.g., L7 ILB).

About the second issue, protobuf messages are not pickle-able. To pass protobuf messages across processes, the client needs to serialize them to string and deserialize at the other side. This is due to the nature of protobuf, that a serialized message can map to many (if not infinite) possible original messages. Without a schema, it is impossible to decode.

like image 75
Lidi Zheng Avatar answered Sep 30 '22 00:09

Lidi Zheng