I want to quickly bzip2 compress several hundred gigabytes of data using my 8 core , 16 GB ram workstation. Currently I am using a simple python script to compress a whole directory tree using bzip2 and an os.system call coupled to an os.walk call.
I see that the bzip2 only uses a single cpu while the other cpus remain relatively idle.
I am a newbie in queue and threaded processes . But I am wondering how I can implement this such that I can have four bzip2 running threads (actually I guess os.system threads ), each using probably their own cpu , that deplete files from a queue as they bzip them.
My single thread script is pasted here .
import os
import sys
for roots, dirlist , filelist in os.walk(os.curdir):
for file in [os.path.join(roots,filegot) for filegot in filelist]:
if "bz2" not in file:
print "Compressing %s" % (file)
os.system("bzip2 %s" % file)
print ":DONE"
Use the subprocess
module to spawn several processes at once. If N of them are running (N should a bit bigger than the number of CPUs you have, say 3 for 2 cores, 10 for 8), wait for one to terminate and then start another one.
Note that this might not help much since there will be a lot of disk activity which you can't parallelize. A lot of free RAM for caches helps.
Try this code from MRAB on comp.lang.python:
import os
import sys
from threading import Thread, Lock
from Queue import Queue
def report(message):
mutex.acquire()
print message
sys.stdout.flush()
mutex.release()
class Compressor(Thread):
def __init__(self, in_queue, out_queue):
Thread.__init__(self)
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
while True:
path = self.in_queue.get()
sys.stdout.flush()
if path is None:
break
report("Compressing %s" % path)
os.system("bzip2 %s" % path)
report("Done %s" % path)
self.out_queue.put(path)
in_queue = Queue()
out_queue = Queue()
mutex = Lock()
THREAD_COUNT = 4
worker_list = []
for i in range(THREAD_COUNT):
worker = Compressor(in_queue, out_queue)
worker.start()
worker_list.append(worker)
for roots, dirlist, filelist in os.walk(os.curdir):
for file in [os.path.join(roots, filegot) for filegot in filelist]:
if "bz2" not in file:
in_queue.put(file)
for i in range(THREAD_COUNT):
in_queue.put(None)
for worker in worker_list:
worker.join()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With