Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to stop my processes from idling or being killed?

I need to process millions of users. I have millions of user_ids, I fetch the user data from a http request and write to a file.

I am using multiprocessing to perform a batch of these task. I then use multithreading in each process to execute a task in a batch. This significantly improves the performance and enables to me to process more users at a faster rate.

Problem:

I find after a certain amount of time all the processes are becoming inactive. I know this by looking at the activity monitor. At the beginning i can see they use a lot of cpu and have threads, after a while they seem idle and my program hangs.

import os
import time
import logging
import multiprocessing
import config
import json
from google.cloud import storage
from pymongo import MongoClient, UpdateOne
from queue import Queue
import threading
from multiprocessing import Pool, cpu_count

PROCESSES = multiprocessing.cpu_count() - 1

def get_tweet_objects(user, counter, lock, proc):

   # Removed ( calls a http request and writes json file to disk

    lock.acquire()
      try:
        counter.value = counter.value + 1
      finally:
        lock.release()

    print("APP ID: {app_id}, REMAINING: {app_remaining}, TOTAL USERS: {total_users}, USER: {user_id}, NO OF TWEETS: {no_tweets}, TIME TAKEN: {time_taken}"
          .format(app_id=app.APP_ID, app_remaining=0, total_users=counter.value, user_id=user["user_id"], no_tweets=len(total_tweets), time_taken=round((end - start), 2)), threading.current_thread().name, proc)


def add_tasks(task_queue, tasks):

    for task in tasks:
        task_queue.put(task)

    return task_queue


def process_tasks(task_queue, counter, lock):

    logger = multiprocessing.get_logger()
    proc = os.getpid()
    while not task_queue.empty():
        try:
            user = task_queue.get()
            do_multithreading(user, counter, lock, proc)

        except Exception as e:
            logger.error(e)
        logger.info(f'Process {proc} completed successfully')
    return True


def manage_queue(task_queue, counter, lock, proc):

    while True:
        user = task_queue.get()
        get_tweet_objects(user, counter, lock, proc)
        task_queue.task_done()


def do_multithreading(batches, counter, lock, proc):
    """Starts the multithreading"""

    # Set the number of threads.
    number_of_threads = 5

    # Initializes the queue.
    task_queue = Queue()

    # Starts the multithreading
    for i in range(number_of_threads):
        t = threading.Thread(target=manage_queue, args=[
                             task_queue, counter, lock, proc])
        t.daemon = True
        t.start()

    for batch in batches:
        task_queue.put(batch)
    task_queue.join()


def run():

    mongodb = MongoClient(host=config.MONGO_URI)["twitter"]

    existing_users = mongodb[SCREEN_NAME].find({}).limit(10000)

    batches = create_batches_of_100(existing_users)

    empty_task_queue = multiprocessing.Manager().Queue()
    full_task_queue = add_tasks(empty_task_queue, batches)
    processes = []

    counter = multiprocessing.Value('i', 0)
    lock = multiprocessing.Lock()

    print(f'Running with {PROCESSES} processes!')
    start = time.time()
    for w in range(PROCESSES):
        p = multiprocessing.Process(
            target=process_tasks, args=(full_task_queue, counter, lock))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    print(f'Time taken = {time.time() - start:.10f}')


if __name__ == '__main__':
    multiprocessing.log_to_stderr(logging.ERROR)
    run()
like image 695
Kay Avatar asked Nov 28 '25 02:11

Kay


1 Answers

So there are multiple issues with the code. First of all avoid infinite loops at all costs like in manage_queue function. Note: I don't mean "avoid while True:", because it doesn't mean that it is an infinite loop (for example you can have break inside it).

With that being said the biggest problem (which we've discovered in long discussion in chat) is that get_tweet_object() function sometimes fails with an exception and when that happens task_queue.task_done() is never called and therefore task_queue.join() never exits.

Another issue is that mixing while not task_queue.empty(): with task_queue.get() is a race condition. What happens when two parallel threads run and task_queue has exactly 1 element? One of them will hang forever. This should be replaced with task_queue.get(False) with appropriate queue.Empty catching. It looks like cosmetics, but the fact is that the race condition is dealt with in .get() call. With that you also need to fill the queue before spawning threads.

All in all here are changes:

from queue import Empty

def do_multithreading(batches, counter, lock, proc):
    """Starts the multithreading"""

    # Set the number of threads.
    number_of_threads = 5

    # Initializes the queue.
    for batch in batches:
        task_queue.put(batch)

    # Starts the multithreading
    for i in range(number_of_threads):
        t = threading.Thread(target=manage_queue, args=[
                             task_queue, counter, lock, proc])
        t.daemon = True
        t.start()
    task_queue.join()

def manage_queue(task_queue, counter, lock, proc):
    while True:
        try:
            user = task_queue.get(False)
        except Empty:
            break

        try:
            get_tweet_objects(user, counter, lock, proc)
        except Exception as exc:
            print(exc)
        finally:
            task_queue.task_done()

def process_tasks(task_queue, counter, lock):
    logger = multiprocessing.get_logger()
    proc = os.getpid()
    while True:
        try:
            user = task_queue.get(False)
        except Empty:
            break
        try:
            do_multithreading(user, counter, lock, proc)
        except Exception as e:
            logger.error(e)
        logger.info(f'Process {proc} completed successfully')
    return True

With that being said I strongly advice utilizing process/thread executors.

like image 143
freakish Avatar answered Nov 29 '25 17:11

freakish



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!