Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python: How can I use an external queue with a ProcessPoolExecutor?

I've very recently started using Python's multi threading and multi processing features.

I tried to write code which uses a producer/consumer approach to read chunks from a JSON log file, write those chunks as events into a queue and then start a set of processes that will poll events from that queue (file chunks) and process each one of them, printing out the results.

My intent is to start the processes first, and leave them waiting for the events to start coming into the queue.

I'm currently using this code, which seems to work, using some bits and pieces from examples I found:

import re, sys
from multiprocessing import Process, Queue

def process(file, chunk):
    f = open(file, "rb")
    f.seek(chunk[0])
    for entry in pat.findall(f.read(chunk[1])):
        print(entry)

def getchunks(file, size=1024*1024):
    f = open(file, "rb")
    while True:
        start = f.tell()
        f.seek(size, 1)
        s = f.readline() # skip forward to next line ending
        yield start, f.tell() - start
        if not s:
            break

def processingChunks(queue):
    while True:
        queueEvent = queue.get()
        if (queueEvent == None):
            queue.put(None)
            break
        process(queueEvent[0], queueEvent[1])

if __name__ == "__main__":
    testFile = "testFile.json"
    pat = re.compile(r".*?\n")
    queue = Queue()

    for w in xrange(6):
        p = Process(target=processingChunks, args=(queue,))
        p.start()

    for chunk in getchunks(testFile):
        queue.put((testFile, chunk))
        print(queue.qsize())
    queue.put(None)

However, I wanted to learn how to use the concurrent.futures ProcessPoolExecutor to achieve the same results in an asynchronous manner, using Future result objects.

My first attempt implied using an external queue, created with the multiprocessing Manager, which I would pass to the processes for polling.

However this doesn't seem to work and I reckon it is possible that this is not the way ProcessPoolExecutor was designed to work, as it seems to use an internal queue of it's own.

I used this code:

import concurrent
from concurrent.futures import as_completed
import re, sys
from multiprocessing import Lock, Process, Queue, current_process, Pool, Manager

def process(file, chunk):
    entries = []
    f = open(file, "rb")
    f.seek(chunk[0])
    for entry in pat.findall(f.read(chunk[1])):
        entries.append(entry)
        return entries

def getchunks(file, size=1024*1024):
    f = open(file, "rb")
    while True:
        start = f.tell()
        f.seek(size, 1)
        s = f.readline() # skip forward to next line ending
        yield start, f.tell() - start
        if not s:
            break

def processingChunks(queue):
    while True:
        queueEvent = queue.get()
        if (queueEvent == None):
            queue.put(None)
            break
        return process(queueEvent[0], queueEvent[1])

if __name__ == "__main__":
    testFile = "testFile.json"
    pat = re.compile(r".*?\n")
    procManager = Manager()
    queue = procManager.Queue()

    with concurrent.futures.ProcessPoolExecutor(max_workers = 6) as executor:
        futureResults = []
        for i in range(6):
            future_result = executor.submit(processingChunks, queue)
            futureResults.append(future_result)

        for complete in as_completed(futureResults):
            res = complete.result()
            for i in res:
                print(i)


    for chunk in getchunks(testFile):
        queue.put((testFile, chunk))
        print(queue.qsize())
    queue.put(None)

I'm unable to obtain any results with this, so obviously I'm doing something wrong and there's something about the concept that I didn't understand.

Can you guys please give me a hand understanding how I could implement this?

like image 610
riscado Avatar asked Oct 21 '22 02:10

riscado


1 Answers

Thanks to Blckknght, who's reply pushed me in the right direction. Here's a possible solution for my initial question:

#!/usr/bin/python
import concurrent
from concurrent.futures import as_completed
import re, sys

def process(event):
    entries = []
    fl = event[0]
    chunk = event[1]
    pat = event[2]
    f = open(fl, "rb")
    f.seek(chunk[0])
    for entry in pat.findall(f.read(chunk[1])):
       entries.append(entry)
    return entries

def getchunks(file, pat, size=1024*1024):
    f = open(file, "rb")
    while True:
        start = f.tell()
        f.seek(size, 1)
        s = f.readline() # skip forward to next line ending
        yield (file, (start, f.tell() - start), pat)
        if not s:
            break

if __name__ == "__main__":
    testFile = "testFile.json"
    pat = re.compile(r".*?\n")
    results = []

    with concurrent.futures.ProcessPoolExecutor() as executor:
        for res in (executor.submit(process, event) for event in getchunks(testFile, pat)):
            results.append(res)

    for complete in as_completed(results):
        for entry in complete.result():
            print('Event result: %s' % entry)    
like image 54
riscado Avatar answered Oct 23 '22 18:10

riscado