Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Processing single file from multiple processes

I have a single big text file in which I want to process each line ( do some operations ) and store them in a database. Since a single simple program is taking too long, I want it to be done via multiple processes or threads. Each thread/process should read the DIFFERENT data(different lines) from that single file and do some operations on their piece of data(lines) and put them in the database so that in the end, I have whole of the data processed and my database is dumped with the data I need.

But I am not able to figure it out that how to approach this.

like image 585
pranavk Avatar asked Jun 25 '12 19:06

pranavk


People also ask

Can a file be opened by multiple processes?

No, if you open a file, other processes can write to it, unless you use a lock. Advisory locks are not useful unless the writing application also agrees to use them.

Can multiple processes read the same file?

Can multiple Java processes read the same file at the same time? Sure they can; and ultimately, it is the role of the OS anyway to ensure that each process/thread reads at its own pace, so you need not worry about it.

Can multiple processes append to the same file?

Two processes successfully appending to the same file will result in all their bytes in the file in order, but not necessarily contiguously. The caveat is that not all filesystems are POSIX-compatible. Two famous examples are NFS and the Hadoop Distributed File System (HDFS).

Can 2 processes write to the same file?

no, generally it is not safe to do this! you need to obtain an exclusive write lock for each process -- that implies that all the other processes will have to wait while one process is writing to the file.. the more I/O intensive processes you have, the longer the wait time.


1 Answers

What you are looking for is a Producer/Consumer pattern

Basic threading example

Here is a basic example using the threading module (instead of multiprocessing)

import threading import Queue import sys  def do_work(in_queue, out_queue):     while True:         item = in_queue.get()         # process         result = item         out_queue.put(result)         in_queue.task_done()  if __name__ == "__main__":     work = Queue.Queue()     results = Queue.Queue()     total = 20      # start for workers     for i in xrange(4):         t = threading.Thread(target=do_work, args=(work, results))         t.daemon = True         t.start()      # produce data     for i in xrange(total):         work.put(i)      work.join()      # get the results     for i in xrange(total):         print results.get()      sys.exit() 

You wouldn't share the file object with the threads. You would produce work for them by supplying the queue with lines of data. Then each thread would pick up a line, process it, and then return it in the queue.

There are some more advanced facilities built into the multiprocessing module to share data, like lists and special kind of Queue. There are trade-offs to using multiprocessing vs threads and it depends on whether your work is cpu bound or IO bound.

Basic multiprocessing.Pool example

Here is a really basic example of a multiprocessing Pool

from multiprocessing import Pool  def process_line(line):     return "FOO: %s" % line  if __name__ == "__main__":     pool = Pool(4)     with open('file.txt') as source_file:         # chunk the work into batches of 4 lines at a time         results = pool.map(process_line, source_file, 4)      print results 

A Pool is a convenience object that manages its own processes. Since an open file can iterate over its lines, you can pass it to the pool.map(), which will loop over it and deliver lines to the worker function. Map blocks and returns the entire result when its done. Be aware that this is an overly simplified example, and that the pool.map() is going to read your entire file into memory all at once before dishing out work. If you expect to have large files, keep this in mind. There are more advanced ways to design a producer/consumer setup.

Manual "pool" with limit and line re-sorting

This is a manual example of the Pool.map, but instead of consuming an entire iterable in one go, you can set a queue size so that you are only feeding it piece by piece as fast as it can process. I also added the line numbers so that you can track them and refer to them if you want, later on.

from multiprocessing import Process, Manager import time import itertools   def do_work(in_queue, out_list):     while True:         item = in_queue.get()         line_no, line = item          # exit signal          if line == None:             return          # fake work         time.sleep(.5)         result = (line_no, line)          out_list.append(result)   if __name__ == "__main__":     num_workers = 4      manager = Manager()     results = manager.list()     work = manager.Queue(num_workers)      # start for workers         pool = []     for i in xrange(num_workers):         p = Process(target=do_work, args=(work, results))         p.start()         pool.append(p)      # produce data     with open("source.txt") as f:         iters = itertools.chain(f, (None,)*num_workers)         for num_and_line in enumerate(iters):             work.put(num_and_line)      for p in pool:         p.join()      # get the results     # example:  [(1, "foo"), (10, "bar"), (0, "start")]     print sorted(results) 
like image 175
jdi Avatar answered Sep 22 '22 13:09

jdi