I use this method to launch a few dozen (less than thousand) of calls of do_it
at different timings in the future:
import threading
timers = []
while True:
for i in range(20):
t = threading.Timer(i * 0.010, do_it, [i]) # I pass the parameter i to function do_it
t.start()
timers.append(t) # so that they can be cancelled if needed
wait_for_something_else() # this can last from 5 ms to 20 seconds
The runtime of each do_it
call is very fast (much less than 0.1 ms) and non-blocking. I would like to avoid spawning hundreds of new threads for such a simple task.
How could I do this with only one additional thread for all do_it
calls?
Is there a simple way to do this with Python, without third party library and only standard library?
As I understand it, you want a single worker thread that can process submitted tasks, not in the order they are submitted, but rather in some prioritized order. This seems like a job for the thread-safe queue.PriorityQueue
.
from dataclasses import dataclass, field
from threading import Thread
from typing import Any
from queue import PriorityQueue
@dataclass(order=True)
class PrioritizedItem:
priority: int
item: Any=field(compare=False)
def thread_worker(q: PriorityQueue[PrioritizedItem]):
while True:
do_it(q.get().item)
q.task_done()
q = PriorityQueue()
t = Thread(target=thread_worker, args=(q,))
t.start()
while True:
for i in range(20):
q.put(PrioritizedItem(priority=i * 0.010, item=i))
wait_for_something_else()
This code assumes you want to run forever. If not, you can add a timeout to the q.get
in thread_worker
, and return when the queue.Empty
exception is thrown because the timeout expired. Like that you'll be able to join the queue/thread after all the jobs have been processed, and the timeout has expired.
If you want to wait until some specific time in the future to run the tasks, it gets a bit more complicated. Here's an approach that extends the above approach by sleeping in the worker thread until the specified time has arrived, but be aware that time.sleep
is only as accurate as your OS allows it to be.
from dataclasses import astuple, dataclass, field
from datetime import datetime, timedelta
from time import sleep
from threading import Thread
from typing import Any
from queue import PriorityQueue
@dataclass(order=True)
class TimedItem:
when: datetime
item: Any=field(compare=False)
def thread_worker(q: PriorityQueue[TimedItem]):
while True:
when, item = astuple(q.get())
sleep_time = (when - datetime.now()).total_seconds()
if sleep_time > 0:
sleep(sleep_time)
do_it(item)
q.task_done()
q = PriorityQueue()
t = Thread(target=thread_worker, args=(q,))
t.start()
while True:
now = datetime.now()
for i in range(20):
q.put(TimedItem(when=now + timedelta(seconds=i * 0.010), item=i))
wait_for_something_else()
To address this problem using only a single extra thread we have to sleep in that thread, so it's possible that new tasks with higher priority could come in while the worker is sleeping. In that case the worker would process that new high priority task after it's done with the current one. The above code assumes that scenario will not happen, which seems reasonable based on the problem description. If that might happen you can alter the sleep code to repeatedly poll if the task at the front of the priority queue has come due. The disadvantage with a polling approach like that is that it would be more CPU intensive.
Also, if you can guarantee that the relative order of the tasks won't change after they've been submitted to the worker, then you can replace the priority queue with a regular queue.Queue
to simplify the code somewhat.
These do_it
tasks can be cancelled by removing them from the queue.
The above code was tested with the following mock definitions:
def do_it(x):
print(x)
def wait_for_something_else():
sleep(5)
An alternative approach that uses no extra threads would be to use asyncio, as pointed out by smcjones. Here's an approach using asyncio that calls do_it
at specific times in the future by using loop.call_later
:
import asyncio
def do_it(x):
print(x)
async def wait_for_something_else():
await asyncio.sleep(5)
async def main():
loop = asyncio.get_event_loop()
while True:
for i in range(20):
loop.call_later(i * 0.010, do_it, i)
await wait_for_something_else()
asyncio.run(main())
These do_it
tasks can be cancelled using the handle returned by loop.call_later
.
This approach will, however, require either switching over your program to use asyncio throughout, or running the asyncio event loop in a separate thread.
It sounds like you want something to be non-blocking and asynchronous, but also single-processed and single-threaded (one thread dedicated to do_it
).
If this is the case, and especially if any networking is involved, so long as you're not actively doing serious I/O on your main thread, it is probably worthwhile using asyncio
instead.
It's designed to handle non-blocking operations, and allows you to make all of your requests without waiting for a response.
Example:
import asyncio
def main():
while True:
tasks = []
for i in range(20):
tasks.append(asyncio.create_task(do_it(i)))
await wait_for_something_else()
for task in tasks:
await task
asyncio.run(main())
Given the time spent on blocking I/O (seconds) - you'll probably waste more time managing threads than you will save on generating a separate thread to do these other operations.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With