Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ThreadPoolExecutor exits before queue is empty

My goal is to concurrently crawl URLs from a queue. Based on the crawling result, the queue may be extended. Here is the MWE:

import queue
from concurrent.futures import ThreadPoolExecutor
import time

def get(url): # let's assume that the HTTP magic happens here
    time.sleep(1)
    return f'data from {url}'

def crawl(url, url_queue: queue.Queue, result_queue: queue.Queue):
    data = get(url)
    result_queue.put(data)
    if 'more' in url:
        url_queue.put('url_extended')

url_queue = queue.Queue()
result_queue = queue.Queue()

for url in ('some_url', 'another_url', 'url_with_more', 'another_url_with_more', 'last_url'): 
    url_queue.put(url)


with ThreadPoolExecutor(max_workers=8) as executor:
    while not url_queue.empty():
        url = url_queue.get()
        executor.submit(crawl, url, url_queue, result_queue)

while not result_queue.empty():
    data = result_queue.get()
    print(data)

In this MWE, two URLs require another crawl: 'url_with_more' and 'another_url_with_more'. They are added to the url_queue while crawling.

However, this solution ends before those two 'more' URLs are processed; after running, the url_queue remains to have two entries.

How can I make sure that the ThreadPoolExecutor does not exit too early? Have I misunderstood ThreadPoolExecutor?

like image 334
Michael Dorner Avatar asked Nov 02 '25 05:11

Michael Dorner


1 Answers

You can just use the futures instead of a queue:

import time
from concurrent.futures import ThreadPoolExecutor

jobs = []


def get(url):
    print(f"start getting data from {url}")
    time.sleep(0.5)
    return f"data from {url}"


def crawl(url):
    data = get(url)
    if "more" in url:
        submit(url.replace("more", "less") + "_url_extended")
    return (url, data)


def submit(url):
    print("Submitting", url)
    jobs.append(executor.submit(crawl, url))


with ThreadPoolExecutor(max_workers=3) as executor:
    for url in (
        "some_url",
        "another_url",
        "url_with_more",
        "another_url_with_more",
        "last_url",
    ):
        submit(url)

    while True:
        done_count = len([1 for job in jobs if job.done()])
        time.sleep(0.5)
        if done_count == len(jobs):
            break

    for job in jobs:
        print(job.result())
like image 136
AKX Avatar answered Nov 03 '25 22:11

AKX



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!