We have pretty large files, the order of 1-1.5 GB combined (mostly log files) with raw data that is easily parseable to a csv, which is subsequently supposed to be graphed to generate a set of graph images.
Currently, we are using bash scripts to turn the raw data into a csv file, with just the numbers that need to be graphed, and then feeding it into a gnuplot script. But this process is extremely slow. I tried to speed up the bash scripts by replacing some piped cut
s, tr
s etc. with a single awk
command, although this improved the speed, the whole thing is still very slow.
So, I am starting to believe there are better tools for this process. I am currently looking to rewrite this process in python+numpy or R. A friend of mine suggested using the JVM, and if I am to do that, I will use clojure, but am not sure how the JVM will perform.
I don't have much experience in dealing with these kind of problems, so any advice on how to proceed would be great. Thanks.
Edit: Also, I will want to store (to disk) the generated intermediate data, i.e., the csv, so I don't have to re-generate it, should I choose I want a different looking graph.
Edit 2: The raw data files have one record per one line, whose fields are separated by a delimiter (|
). Not all fields are numbers. Each field I need in the output csv is obtained by applying a certain formula on the input records, which may use multiple fields from the input data. The output csv will have 3-4 fields per line, and I need graphs that plot 1-2, 1-3, 1-4 fields in a (may be) bar chart. I hope that gives a better picture.
Edit 3: I have modified @adirau's script a little and it seems to be working pretty well. I have come far enough that I am reading data, sending to a pool of processor threads (pseudo processing, append thread name to data), and aggregating it into an output file, through another collector thread.
PS: I am not sure about the tagging of this question, feel free to correct it.
python sounds to be a good choice because it has a good threading API (the implementation is questionable though), matplotlib and pylab. I miss some more specs from your end but maybe this could be a good starting point for you: matplotlib: async plotting with threads. I would go for a single thread for handling bulk disk i/o reads and sync queueing to a pool of threads for data processing (if you have fixed record lengths things may get faster by precomputing reading offsets and passing just the offsets to the threadpool); with the diskio thread I would mmap the datasource files, read a predefined num bytes + one more read to eventually grab the last bytes to the end of the current datasource lineinput; the numbytes should be chosen somewhere near your average lineinput length; next is pool feeding via the queue and the data processing / plotting that takes place in the threadpool; I don't have a good picture here (of what are you plotting exactly) but I hope this helps.
EDIT: there's file.readlines([sizehint]) to grab multiple lines at once; well it may not be so speedy cuz the docs are saying its using readline() internally
EDIT: a quick skeleton code
import threading
from collections import deque
import sys
import mmap
class processor(Thread):
"""
processor gets a batch of data at time from the diskio thread
"""
def __init__(self,q):
Thread.__init__(self,name="plotter")
self._queue = q
def run(self):
#get batched data
while True:
#we wait for a batch
dataloop = self.feed(self._queue.get())
try:
while True:
self.plot(dataloop.next())
except StopIteration:
pass
#sanitizer exceptions following, maybe
def parseline(self,line):
""" return a data struct ready for plotting """
raise NotImplementedError
def feed(self,databuf):
#we yield one-at-time datastruct ready-to-go for plotting
for line in databuf:
yield self.parseline(line)
def plot(self,data):
"""integrate
https://www.esclab.tw/wiki/index.php/Matplotlib#Asynchronous_plotting_with_threads
maybe
"""
class sharedq(object):
"""i dont recall where i got this implementation from
you may write a better one"""
def __init__(self,maxsize=8192):
self.queue = deque()
self.barrier = threading.RLock()
self.read_c = threading.Condition(self.barrier)
self.write_c = threading.Condition(self.barrier)
self.msz = maxsize
def put(self,item):
self.barrier.acquire()
while len(self.queue) >= self.msz:
self.write_c.wait()
self.queue.append(item)
self.read_c.notify()
self.barrier.release()
def get(self):
self.barrier.acquire()
while not self.queue:
self.read_c.wait()
item = self.queue.popleft()
self.write_c.notify()
self.barrier.release()
return item
q = sharedq()
#sizehint for readine lines
numbytes=1024
for i in xrange(8):
p = processor(q)
p.start()
for fn in sys.argv[1:]
with open(fn, "r+b") as f:
#you may want a better sizehint here
map = mmap.mmap(f.fileno(), 0)
#insert a loop here, i forgot
q.put(map.readlines(numbytes))
#some cleanup code may be desirable
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With