Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I use threads on a generator while keeping the order?

I have a simple code that runs a GET request per each item in the generator that I'm trying to speed up:

def stream(self, records):
    # type(records) = <type 'generator'>
    for record in records:
        # record = OrderedDict([('_time', '1518287568'), ('data', '5552267792')])
        output = rest_api_lookup(record[self.input_field])

        record.update(output)
        yield record

Right now this runs on a single thread and takes forever since each REST call waits until the previous REST call finishes.

I have used multithreading in Python from a list before using this great answer (https://stackoverflow.com/a/28463266/1150923), but I'm not sure how to re-use the same strategy on a generator instead of a list.

I had some advise from a fellow developer who recommended me that I break out the generator into 100-element lists and then close the pool, but I don't know how to create these lists from the generator.

I also need to keep the original order since I need to yield record in the right order.

like image 211
hobbes3 Avatar asked Feb 10 '18 19:02

hobbes3


3 Answers

I assume you don't want to turn your generator records into a list first. One way to speed up your processing is to pass the records into a ThreadPoolExecutor chunk-wise. The executor will process your rest_api_lookup concurrently for all items of the chunk. Then you just need to "unchunk" your results. Here's some running sample code (which does not use classes, sorry, but I hope it shows the principle):

from concurrent.futures import ThreadPoolExecutor
from time import sleep

pool = ThreadPoolExecutor(8) # 8 threads, adjust to taste and # of cores

def records():
    # simulates records generator
    for i in range(100):
        yield {'a': i}


def rest_api_lookup(a):
    # simulates REST call :)
    sleep(0.1)
    return {'b': -a}


def stream(records):
    def update_fun(record):
        output = rest_api_lookup(record['a'])
        record.update(output)
        return record
    chunk = []
    for record in records:
        # submit update_fun(record) into pool, keep resulting Future
        chunk.append(pool.submit(update_fun, record))
        if len(chunk) == 8:
            yield chunk
            chunk = []
    if chunk:
        yield chunk

def unchunk(chunk_gen):
    """Flattens a generator of Future chunks into a generator of Future results."""
    for chunk in chunk_gen:
        for f in chunk:
            yield f.result() # get result from Future

# Now iterate over all results in same order as generated by records()    
for result in unchunk(stream(records())):
    print(result)

HTH!

Update: I added a sleep to the simulated REST call, to make it more realistic. This chunked version finishes on my machine in 1.5 seconds. The sequential version takes 10 seconds (as is to be expected, 100 * 0.1s = 10s).

like image 184
dnswlt Avatar answered Oct 22 '22 08:10

dnswlt


Here's an example how you can do it with concurrent.futures:

from concurrent import futures
from concurrent.futures import ThreadPoolExecutor

class YourClass(object):

    def stream(self, records):
        for record in records:
            output = rest_api_lookup(record[self.input_field])
            record.update(output)
        # process your list and yield back result.
        yield {"result_key": "whatever the result is"}

    def run_parallel(self):
        """ Use this method to do the parallel processing """

        # The important part - concurrent futures 
        # - set number of workers as the number of jobs to process - suggest 4, but may differ 
        #   this will depend on how many threads you want to run in parallel
        with ThreadPoolExecutor(4) as executor:
            # Use list jobs for concurrent futures
            # Use list scraped_results for results
            jobs = []
            parallel_results = []

            # Pass some keyword arguments if needed - per job  
            record1 = {} # your values for record1 - if need more - create
            record2 = {} # your values for record2 - if need more - create
            record3 = {} # your values for record3 - if need more - create
            record4 = {} # your values for record4 - if need more - create

            list_of_records = [[record1, record2], [record3, record4],]


            for records in list_of_records:
                # Here we iterate 'number of records' times, could be different
                # We're adding stream, could be different function per call
                jobs.append(executor.submit(self.stream, records))

            # Once parallel processing is complete, iterate over results 
            # append results to final processing without any networking
            for job in futures.as_completed(jobs):
                # Read result from future
                result = job.result()
                # Append to the list of results
                parallel_results.append(result)
            # Use sorted to sort by key to preserve order
            parallel_results = sorted(parallel_results, key=lambda k: k['result_key']) 
            # Iterate over results streamed and do whatever is needed
            for result is parallel_results:
                print("Do something with me {}".format(result))
like image 35
dmitryro Avatar answered Oct 22 '22 09:10

dmitryro


The answer by dnswlt works well but can still be improved. If the request to the REST API (or whatever else should be done with each record) take a variable amount of time, some CPUs may be idle while the slowest request of each batch is running.

The following solution takes a generator and a function as an input and applies the function to each element produced by the generator while maintaining a given number of running threads (each of which applies the function to one element). At the same time, it still returns the results in the order of the input.

from concurrent.futures import ThreadPoolExecutor
import os
import random
import time

def map_async(iterable, func, max_workers=os.cpu_count()):
    # Generator that applies func to the input using max_workers concurrent jobs
    def async_iterator():
        iterator = iter(iterable)
        pending_results = []
        has_input = True
        thread_pool = ThreadPoolExecutor(max_workers)
        while True:
            # Submit jobs for remaining input until max_worker jobs are running
            while has_input and \
                    len([e for e in pending_results if e.running()]) \
                        < max_workers:
                try:
                    e = next(iterator)
                    print('Submitting task...')
                    pending_results.append(thread_pool.submit(func, e))
                except StopIteration:
                    print('Submitted all task.')
                    has_input = False

            # If there are no pending results, the generator is done
            if not pending_results:
                return

            # If the oldest job is done, return its value
            if pending_results[0].done():
                yield pending_results.pop(0).result()
            # Otherwise, yield the CPU, then continue starting new jobs
            else:
                time.sleep(.01)

    return async_iterator()

def example_generator():
    for i in range(20):
        print('Creating task', i)
        yield i

def do_work(i):
    print('Starting to work on', i)
    time.sleep(random.uniform(0, 3))
    print('Done with', i)
    return i

random.seed(42)
for i in map_async(example_generator(), do_work):
    print('Got result:', i)

The commented output of a possible execution (on a machine with 8 logical CPUs):

Creating task 0
Submitting task...
Starting to work on 0
Creating task 1
Submitting task...
Starting to work on 1
Creating task 2
Submitting task...
Starting to work on 2
Creating task 3
Submitting task...
Starting to work on 3
Creating task 4
Submitting task...
Starting to work on 4
Creating task 5
Submitting task...
Starting to work on 5
Creating task 6
Submitting task...
Starting to work on 6
Creating task 7
Submitting task...
Starting to work on 7      # This point is reached quickly: 8 jobs are started before any of them finishes
Done with 1                # Job 1 is done, but since job 0 is not, the result is not returned yet
Creating task 8            # Job 1 finished, so a new job can be started
Submitting task...
Creating task 9
Starting to work on 8
Submitting task...
Done with 7
Starting to work on 9
Done with 9
Creating task 10
Submitting task...
Creating task 11
Starting to work on 10
Submitting task...
Done with 3
Starting to work on 11
Done with 2
Creating task 12
Submitting task...
Creating task 13
Starting to work on 12
Submitting task...
Done with 12
Starting to work on 13
Done with 10
Creating task 14
Submitting task...
Creating task 15
Starting to work on 14
Submitting task...
Done with 8
Starting to work on 15
Done with 13                  # Several other jobs are started and completed
Creating task 16
Submitting task...
Creating task 17
Starting to work on 16
Submitting task...
Done with 0                   # Finally, job 0 is completed
Starting to work on 17
Got result: 0
Got result: 1
Got result: 2
Got result: 3                 # The result of all completed jobs are returned in input order until the job of the next one is still running
Done with 5
Creating task 18
Submitting task...
Creating task 19
Starting to work on 18
Submitting task...
Done with 16
Starting to work on 19
Done with 11
Submitted all task.
Done with 19
Done with 4
Got result: 4
Got result: 5
Done with 6
Got result: 6                  # Job 6 must have been a very long job; now that it's done, its result and the result of many subsequent jobs can be returned
Got result: 7
Got result: 8
Got result: 9
Got result: 10
Got result: 11
Got result: 12
Got result: 13
Done with 14
Got result: 14
Done with 15
Got result: 15
Got result: 16
Done with 17
Got result: 17
Done with 18
Got result: 18
Got result: 19

The above run took about 4.7s while the sequential execution (setting max_workers=1) took about 23.6s. Without the optimization that avoids waiting for the slowest execution per batch, the execution takes about 5.3s. Depending on the variation of the individual job times and max_workers, the effect of the optimization may be even larger.

like image 1
ingomueller.net Avatar answered Oct 22 '22 07:10

ingomueller.net