Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to avoid to start hundreds of threads when starting (very short) actions at different timings in the future

I use this method to launch a few dozen (less than thousand) of calls of do_it at different timings in the future:

import threading
timers = []
while True:
    for i in range(20):
        t = threading.Timer(i * 0.010, do_it, [i])    # I pass the parameter i to function do_it
        t.start()
        timers.append(t)  # so that they can be cancelled if needed
    wait_for_something_else() # this can last from 5 ms to 20 seconds

The runtime of each do_it call is very fast (much less than 0.1 ms) and non-blocking. I would like to avoid spawning hundreds of new threads for such a simple task.

How could I do this with only one additional thread for all do_it calls?

Is there a simple way to do this with Python, without third party library and only standard library?

like image 721
Basj Avatar asked Sep 17 '21 14:09

Basj


2 Answers

As I understand it, you want a single worker thread that can process submitted tasks, not in the order they are submitted, but rather in some prioritized order. This seems like a job for the thread-safe queue.PriorityQueue.

from dataclasses import dataclass, field
from threading import Thread
from typing import Any
from queue import PriorityQueue


@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)


def thread_worker(q: PriorityQueue[PrioritizedItem]):
    while True:
        do_it(q.get().item)
        q.task_done()


q = PriorityQueue()
t = Thread(target=thread_worker, args=(q,))
t.start()
while True:
    for i in range(20):
        q.put(PrioritizedItem(priority=i * 0.010, item=i))
    wait_for_something_else()

This code assumes you want to run forever. If not, you can add a timeout to the q.get in thread_worker, and return when the queue.Empty exception is thrown because the timeout expired. Like that you'll be able to join the queue/thread after all the jobs have been processed, and the timeout has expired.

If you want to wait until some specific time in the future to run the tasks, it gets a bit more complicated. Here's an approach that extends the above approach by sleeping in the worker thread until the specified time has arrived, but be aware that time.sleep is only as accurate as your OS allows it to be.

from dataclasses import astuple, dataclass, field
from datetime import datetime, timedelta
from time import sleep
from threading import Thread
from typing import Any
from queue import PriorityQueue


@dataclass(order=True)
class TimedItem:
    when: datetime
    item: Any=field(compare=False)


def thread_worker(q: PriorityQueue[TimedItem]):
    while True:
        when, item = astuple(q.get())
        sleep_time = (when - datetime.now()).total_seconds()
        if sleep_time > 0:
            sleep(sleep_time)
        do_it(item)
        q.task_done()


q = PriorityQueue()
t = Thread(target=thread_worker, args=(q,))
t.start()
while True:
    now = datetime.now()
    for i in range(20):
        q.put(TimedItem(when=now + timedelta(seconds=i * 0.010), item=i))
    wait_for_something_else()

To address this problem using only a single extra thread we have to sleep in that thread, so it's possible that new tasks with higher priority could come in while the worker is sleeping. In that case the worker would process that new high priority task after it's done with the current one. The above code assumes that scenario will not happen, which seems reasonable based on the problem description. If that might happen you can alter the sleep code to repeatedly poll if the task at the front of the priority queue has come due. The disadvantage with a polling approach like that is that it would be more CPU intensive.

Also, if you can guarantee that the relative order of the tasks won't change after they've been submitted to the worker, then you can replace the priority queue with a regular queue.Queue to simplify the code somewhat.

These do_it tasks can be cancelled by removing them from the queue.

The above code was tested with the following mock definitions:

def do_it(x):
    print(x)

def wait_for_something_else():
    sleep(5)

An alternative approach that uses no extra threads would be to use asyncio, as pointed out by smcjones. Here's an approach using asyncio that calls do_it at specific times in the future by using loop.call_later:

import asyncio


def do_it(x):
    print(x)


async def wait_for_something_else():
    await asyncio.sleep(5)


async def main():
    loop = asyncio.get_event_loop()
    while True:
        for i in range(20):
            loop.call_later(i * 0.010, do_it, i)
        await wait_for_something_else()

asyncio.run(main())

These do_it tasks can be cancelled using the handle returned by loop.call_later.

This approach will, however, require either switching over your program to use asyncio throughout, or running the asyncio event loop in a separate thread.

like image 137
Will Da Silva Avatar answered Oct 15 '22 01:10

Will Da Silva


It sounds like you want something to be non-blocking and asynchronous, but also single-processed and single-threaded (one thread dedicated to do_it).

If this is the case, and especially if any networking is involved, so long as you're not actively doing serious I/O on your main thread, it is probably worthwhile using asyncio instead.

It's designed to handle non-blocking operations, and allows you to make all of your requests without waiting for a response.

Example:

import asyncio


def main():
    while True:
        tasks = []
        for i in range(20):
            tasks.append(asyncio.create_task(do_it(i)))  
        await wait_for_something_else()
        for task in tasks:
            await task

asyncio.run(main())

Given the time spent on blocking I/O (seconds) - you'll probably waste more time managing threads than you will save on generating a separate thread to do these other operations.

like image 37
smcjones Avatar answered Oct 15 '22 01:10

smcjones