My goal is to concurrently crawl URLs from a queue. Based on the crawling result, the queue may be extended. Here is the MWE:
import queue
from concurrent.futures import ThreadPoolExecutor
import time
def get(url): # let's assume that the HTTP magic happens here
    time.sleep(1)
    return f'data from {url}'
def crawl(url, url_queue: queue.Queue, result_queue: queue.Queue):
    data = get(url)
    result_queue.put(data)
    if 'more' in url:
        url_queue.put('url_extended')
url_queue = queue.Queue()
result_queue = queue.Queue()
for url in ('some_url', 'another_url', 'url_with_more', 'another_url_with_more', 'last_url'): 
    url_queue.put(url)
with ThreadPoolExecutor(max_workers=8) as executor:
    while not url_queue.empty():
        url = url_queue.get()
        executor.submit(crawl, url, url_queue, result_queue)
while not result_queue.empty():
    data = result_queue.get()
    print(data)
In this MWE, two URLs require another crawl: 'url_with_more' and 'another_url_with_more'. They are added to the url_queue while crawling.
However, this solution ends before those two 'more' URLs are processed; after running, the url_queue remains to have two entries.
How can I make sure that the ThreadPoolExecutor does not exit too early? Have I misunderstood ThreadPoolExecutor?
You can just use the futures instead of a queue:
import time
from concurrent.futures import ThreadPoolExecutor
jobs = []
def get(url):
    print(f"start getting data from {url}")
    time.sleep(0.5)
    return f"data from {url}"
def crawl(url):
    data = get(url)
    if "more" in url:
        submit(url.replace("more", "less") + "_url_extended")
    return (url, data)
def submit(url):
    print("Submitting", url)
    jobs.append(executor.submit(crawl, url))
with ThreadPoolExecutor(max_workers=3) as executor:
    for url in (
        "some_url",
        "another_url",
        "url_with_more",
        "another_url_with_more",
        "last_url",
    ):
        submit(url)
    while True:
        done_count = len([1 for job in jobs if job.done()])
        time.sleep(0.5)
        if done_count == len(jobs):
            break
    for job in jobs:
        print(job.result())
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With