I've very recently started using Python's multi threading and multi processing features.
I tried to write code which uses a producer/consumer approach to read chunks from a JSON log file, write those chunks as events into a queue and then start a set of processes that will poll events from that queue (file chunks) and process each one of them, printing out the results.
My intent is to start the processes first, and leave them waiting for the events to start coming into the queue.
I'm currently using this code, which seems to work, using some bits and pieces from examples I found:
import re, sys
from multiprocessing import Process, Queue
def process(file, chunk):
f = open(file, "rb")
f.seek(chunk[0])
for entry in pat.findall(f.read(chunk[1])):
print(entry)
def getchunks(file, size=1024*1024):
f = open(file, "rb")
while True:
start = f.tell()
f.seek(size, 1)
s = f.readline() # skip forward to next line ending
yield start, f.tell() - start
if not s:
break
def processingChunks(queue):
while True:
queueEvent = queue.get()
if (queueEvent == None):
queue.put(None)
break
process(queueEvent[0], queueEvent[1])
if __name__ == "__main__":
testFile = "testFile.json"
pat = re.compile(r".*?\n")
queue = Queue()
for w in xrange(6):
p = Process(target=processingChunks, args=(queue,))
p.start()
for chunk in getchunks(testFile):
queue.put((testFile, chunk))
print(queue.qsize())
queue.put(None)
However, I wanted to learn how to use the concurrent.futures ProcessPoolExecutor to achieve the same results in an asynchronous manner, using Future result objects.
My first attempt implied using an external queue, created with the multiprocessing Manager, which I would pass to the processes for polling.
However this doesn't seem to work and I reckon it is possible that this is not the way ProcessPoolExecutor was designed to work, as it seems to use an internal queue of it's own.
I used this code:
import concurrent
from concurrent.futures import as_completed
import re, sys
from multiprocessing import Lock, Process, Queue, current_process, Pool, Manager
def process(file, chunk):
entries = []
f = open(file, "rb")
f.seek(chunk[0])
for entry in pat.findall(f.read(chunk[1])):
entries.append(entry)
return entries
def getchunks(file, size=1024*1024):
f = open(file, "rb")
while True:
start = f.tell()
f.seek(size, 1)
s = f.readline() # skip forward to next line ending
yield start, f.tell() - start
if not s:
break
def processingChunks(queue):
while True:
queueEvent = queue.get()
if (queueEvent == None):
queue.put(None)
break
return process(queueEvent[0], queueEvent[1])
if __name__ == "__main__":
testFile = "testFile.json"
pat = re.compile(r".*?\n")
procManager = Manager()
queue = procManager.Queue()
with concurrent.futures.ProcessPoolExecutor(max_workers = 6) as executor:
futureResults = []
for i in range(6):
future_result = executor.submit(processingChunks, queue)
futureResults.append(future_result)
for complete in as_completed(futureResults):
res = complete.result()
for i in res:
print(i)
for chunk in getchunks(testFile):
queue.put((testFile, chunk))
print(queue.qsize())
queue.put(None)
I'm unable to obtain any results with this, so obviously I'm doing something wrong and there's something about the concept that I didn't understand.
Can you guys please give me a hand understanding how I could implement this?
Thanks to Blckknght, who's reply pushed me in the right direction. Here's a possible solution for my initial question:
#!/usr/bin/python
import concurrent
from concurrent.futures import as_completed
import re, sys
def process(event):
entries = []
fl = event[0]
chunk = event[1]
pat = event[2]
f = open(fl, "rb")
f.seek(chunk[0])
for entry in pat.findall(f.read(chunk[1])):
entries.append(entry)
return entries
def getchunks(file, pat, size=1024*1024):
f = open(file, "rb")
while True:
start = f.tell()
f.seek(size, 1)
s = f.readline() # skip forward to next line ending
yield (file, (start, f.tell() - start), pat)
if not s:
break
if __name__ == "__main__":
testFile = "testFile.json"
pat = re.compile(r".*?\n")
results = []
with concurrent.futures.ProcessPoolExecutor() as executor:
for res in (executor.submit(process, event) for event in getchunks(testFile, pat)):
results.append(res)
for complete in as_completed(results):
for entry in complete.result():
print('Event result: %s' % entry)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With