Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Kafka multiprocess vs thread

I can use KafkaConsumer to consume messages in separate threads.

However, when I use multiprocessing.Process instead of threading.Thread, I get an error:

OSError: [Errno 9] Bad file descriptor

This question and documentation suggests that using multiprocessing to consume messages in parallel is possible. Would someone please share a working example?

Edit

Here's some sample code. Sorry the original code is too involved, so I created a sample here that I hope communicates what is happening. This code works fine if I use threading.Thread instead of multiprocessing.Process.

from multiprocessing import Process

class KafkaWrapper():
    def __init__(self):
        self.consumer = KafkaConsumer(bootstrap_servers='my.server.com')

    def consume(self, topic):
        self.consumer.subscribe(topic)
        for message in self.consumer:
            print(message.value)

class ServiceInterface():
    def __init__(self):
        self.kafka_wrapper = KafkaWrapper()

    def start(self, topic):
        self.kafka_wrapper.consume(topic)

class ServiceA(ServiceInterface):
    pass

class ServiceB(ServiceInterface):
    pass


def main():

    serviceA = ServiceA()
    serviceB = ServiceB()

    jobs=[]
    # The code works fine if I used threading.Thread here instead of Process
    jobs.append(Process(target=serviceA.start, args=("my-topic",)))
    jobs.append(Process(target=serviceB.start, args=("my-topic",)))

    for job in jobs:
        job.start()

    for job in jobs:
        job.join()

if __name__ == "__main__":
    main()

And here's the error I see (Again, my actual code is different from the above sample, and it works fine if I use threading.Thread but not if I use multiprocessing.Process):

File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "service_interface.py", line 58, in start
    self._kafka_wrapper.start_consuming(self.service_object_id)
  File "kafka_wrapper.py", line 141, in start_consuming
    for message in self._consumer:
  File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in __next__
    return next(self._iterator)
  File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1022, in _message_generator
    self._client.poll(timeout_ms=poll_ms, sleep=True)
  File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 556, in poll
    responses.extend(self._poll(timeout, sleep=sleep))
  File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 573, in _poll
    ready = self._selector.select(timeout)
  File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/selectors.py", line 577, in select
    kev_list = self._kqueue.control(None, max_ev, timeout)
  File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "service_interface.py", line 58, in start
    self._kafka_wrapper.start_consuming(self.service_object_id)
  File "kafka_wrapper.py", line 141, in start_consuming
    for message in self._consumer:
  File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in __next__
    return next(self._iterator)
  File "venv/lib/python3.6/site-packages/kafka/consumer/group.py", line 1022, in _message_generator
    self._client.poll(timeout_ms=poll_ms, sleep=True)
  File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 556, in poll
    responses.extend(self._poll(timeout, sleep=sleep))
OSError: [Errno 9] Bad file descriptor
  File "venv/lib/python3.6/site-packages/kafka/client_async.py", line 573, in _poll
    ready = self._selector.select(timeout)
  File "/usr/local/Cellar/python3/3.6.2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/selectors.py", line 577, in select
    kev_list = self._kqueue.control(None, max_ev, timeout)
OSError: [Errno 9] Bad file descriptor
like image 223
Deven Avatar asked Oct 23 '25 17:10

Deven


1 Answers

Kafka consumers could be either multi process or multi threading (make sure the client library used correctly supports Kafka Consumer Group, required in early version of Kafka), the choice is up to you.

However if we want to using processes, the Kafka client library need to do something, to guaranteed itself fork safe, that the underlying TCP connections used (connecting to Kafka servers) ought not be shared by more than one processes. And this is why you got an connection error.

As a workaround, you should not create KafkaConsumer before spawning processes. Instead, move the operation into each process.

Another way is to use a single thread/process fetching message, and use an extra process pool to do the real operations.

like image 70
Jacky1205 Avatar answered Oct 26 '25 06:10

Jacky1205