I need to process millions of users. I have millions of user_ids, I fetch the user data from a http request and write to a file.
I am using multiprocessing to perform a batch of these task. I then use multithreading in each process to execute a task in a batch. This significantly improves the performance and enables to me to process more users at a faster rate.
Problem:
I find after a certain amount of time all the processes are becoming inactive. I know this by looking at the activity monitor. At the beginning i can see they use a lot of cpu and have threads, after a while they seem idle and my program hangs.
import os
import time
import logging
import multiprocessing
import config
import json
from google.cloud import storage
from pymongo import MongoClient, UpdateOne
from queue import Queue
import threading
from multiprocessing import Pool, cpu_count
PROCESSES = multiprocessing.cpu_count() - 1
def get_tweet_objects(user, counter, lock, proc):
# Removed ( calls a http request and writes json file to disk
lock.acquire()
try:
counter.value = counter.value + 1
finally:
lock.release()
print("APP ID: {app_id}, REMAINING: {app_remaining}, TOTAL USERS: {total_users}, USER: {user_id}, NO OF TWEETS: {no_tweets}, TIME TAKEN: {time_taken}"
.format(app_id=app.APP_ID, app_remaining=0, total_users=counter.value, user_id=user["user_id"], no_tweets=len(total_tweets), time_taken=round((end - start), 2)), threading.current_thread().name, proc)
def add_tasks(task_queue, tasks):
for task in tasks:
task_queue.put(task)
return task_queue
def process_tasks(task_queue, counter, lock):
logger = multiprocessing.get_logger()
proc = os.getpid()
while not task_queue.empty():
try:
user = task_queue.get()
do_multithreading(user, counter, lock, proc)
except Exception as e:
logger.error(e)
logger.info(f'Process {proc} completed successfully')
return True
def manage_queue(task_queue, counter, lock, proc):
while True:
user = task_queue.get()
get_tweet_objects(user, counter, lock, proc)
task_queue.task_done()
def do_multithreading(batches, counter, lock, proc):
"""Starts the multithreading"""
# Set the number of threads.
number_of_threads = 5
# Initializes the queue.
task_queue = Queue()
# Starts the multithreading
for i in range(number_of_threads):
t = threading.Thread(target=manage_queue, args=[
task_queue, counter, lock, proc])
t.daemon = True
t.start()
for batch in batches:
task_queue.put(batch)
task_queue.join()
def run():
mongodb = MongoClient(host=config.MONGO_URI)["twitter"]
existing_users = mongodb[SCREEN_NAME].find({}).limit(10000)
batches = create_batches_of_100(existing_users)
empty_task_queue = multiprocessing.Manager().Queue()
full_task_queue = add_tasks(empty_task_queue, batches)
processes = []
counter = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
print(f'Running with {PROCESSES} processes!')
start = time.time()
for w in range(PROCESSES):
p = multiprocessing.Process(
target=process_tasks, args=(full_task_queue, counter, lock))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f'Time taken = {time.time() - start:.10f}')
if __name__ == '__main__':
multiprocessing.log_to_stderr(logging.ERROR)
run()
So there are multiple issues with the code. First of all avoid infinite loops at all costs like in manage_queue function. Note: I don't mean "avoid while True:", because it doesn't mean that it is an infinite loop (for example you can have break inside it).
With that being said the biggest problem (which we've discovered in long discussion in chat) is that get_tweet_object() function sometimes fails with an exception and when that happens task_queue.task_done() is never called and therefore task_queue.join() never exits.
Another issue is that mixing while not task_queue.empty(): with task_queue.get() is a race condition. What happens when two parallel threads run and task_queue has exactly 1 element? One of them will hang forever. This should be replaced with task_queue.get(False) with appropriate queue.Empty catching. It looks like cosmetics, but the fact is that the race condition is dealt with in .get() call. With that you also need to fill the queue before spawning threads.
All in all here are changes:
from queue import Empty
def do_multithreading(batches, counter, lock, proc):
"""Starts the multithreading"""
# Set the number of threads.
number_of_threads = 5
# Initializes the queue.
for batch in batches:
task_queue.put(batch)
# Starts the multithreading
for i in range(number_of_threads):
t = threading.Thread(target=manage_queue, args=[
task_queue, counter, lock, proc])
t.daemon = True
t.start()
task_queue.join()
def manage_queue(task_queue, counter, lock, proc):
while True:
try:
user = task_queue.get(False)
except Empty:
break
try:
get_tweet_objects(user, counter, lock, proc)
except Exception as exc:
print(exc)
finally:
task_queue.task_done()
def process_tasks(task_queue, counter, lock):
logger = multiprocessing.get_logger()
proc = os.getpid()
while True:
try:
user = task_queue.get(False)
except Empty:
break
try:
do_multithreading(user, counter, lock, proc)
except Exception as e:
logger.error(e)
logger.info(f'Process {proc} completed successfully')
return True
With that being said I strongly advice utilizing process/thread executors.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With