Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Most pythonic way to break up highly branched parser

I'm working on a parser for a specific type of file that is broken up into sections by some header keyword followed a bunch of heterogeneous data. Headers are always separated by blank lines. Something along the lines of the following:

Header_A

1 1.02345
2 2.97959
...

Header_B

1   5.1700   10.2500
2   5.0660   10.5000
...

Every header contains very different types of data and depending on certain keywords within a block, the data must be stored in different locations. The general approach I took is to have some regex that catches all of the keywords that can define a header and then iterate through the lines in the file. Once I find a match, I pop lines until I reach a blank line, storing all of the data from lines in the appropriate locations.

This is the basic structure of the code where "do stuff with current_line" will involve a bunch of branches depending on what the line contains:

headers = re.compile(r"""
    ((?P<header_a>Header_A)
    |
    (?P<header_b>Header_B))
    """, re.VERBOSE)

i = 0
while i < len(data_lines):
    match = header.match(data_lines[i])
    if match:
        if match.group('header_a'):
            data_lines.pop(i)
            data_lines.pop(i)

            #     not end of file         not blank line
            while i < len(data_lines) and data_lines[i].strip():
                current_line = data_lines.pop(i)
                # do stuff with current_line

        elif match.group('header_b'):
            data_lines.pop(i)
            data_lines.pop(i)

            while i < len(data_lines) and data_lines[i].strip():
                current_line = data_lines.pop(i)
                # do stuff with current_line
        else:
            i += 1
    else:
        i += 1

Everything works correctly but it amounts to a highly branched structure that I find to be highly illegible and likely hard to follow for anyone unfamiliar with the code. It also makes it more difficult to keep lines at <79 characters and more generally doesn't feel very pythonic.

One thing I'm working on is separating the branch for each header into separate functions. This will hopefully improve readability quite a bit but...

...is there a cleaner way to perform the outer looping/matching structure? Maybe using itertools?

Also for various reasons this code must be able to run in 2.7.

like image 732
CTKlein Avatar asked Feb 28 '14 22:02

CTKlein


3 Answers

You could use itertools.groupby to group the lines according to which processing function you wish to perform:

import itertools as IT

def process_a(lines):
    for line in lines:
        line = line.strip()
        if not line: continue        
        print('processing A: {}'.format(line))

def process_b(lines):
    for line in lines:
        line = line.strip()
        if not line: continue        
        print('processing B: {}'.format(line))

def header_func(line):
    if line.startswith('Header_A'):
        return process_a
    elif line.startswith('Header_B'):
        return process_b
    else: return None  # you could omit this, but it might be nice to be explicit

with open('data', 'r') as f:
    for key, lines in IT.groupby(f, key=header_func):
        if key is None:
            if func is not None:
                func(lines)
        else:
            func = key

Applied to the data you posted, the above code prints

processing A: 1 1.02345
processing A: 2 2.97959
processing A: ...
processing B: 1   5.1700   10.2500
processing B: 2   5.0660   10.5000
processing B: ...

The one complicated line in the code above is

for key, lines in IT.groupby(f, key=header_func):

Let's try to break it down into its component parts:

In [31]: f = open('data')

In [32]: list(IT.groupby(f, key=header_func))
Out[32]: 
[(<function __main__.process_a>, <itertools._grouper at 0xa0efecc>),
 (None, <itertools._grouper at 0xa0ef7cc>),
 (<function __main__.process_b>, <itertools._grouper at 0xa0eff0c>),
 (None, <itertools._grouper at 0xa0ef84c>)]

IT.groupby(f, key=header_func) returns an iterator. The items yielded by the iterator are 2-tuples, such as

(<function __main__.process_a>, <itertools._grouper at 0xa0efecc>)

The first item in the 2-tuple is the value returned by header_func. The second item in the 2-tuple is an iterator. This iterator yields lines from f for which header_func(line) all return the same value.

Thus, IT.groupby is grouping the lines in f according to the return value of header_func. When the line in f is a header line -- either Header_A or Header_B -- then header_func returns process_a or process_b, the function we wish to use to process subsequent lines.

When the line in f is a header line, the group of lines returned by IT.groupby (the second item in the 2-tuple) is short and uninteresting -- it is just the header line.

We need to look in the next group for the interesting lines. For these lines, header_func returns None.

So we need to look at two 2-tuples: the first 2-tuple yielded by IT.groupby gives us the function to use, and the second 2-tuple gives the lines to which the header function should be applied.

Once you have both the function and the iterator with the interesting lines, you just call func(lines) and you're done!

Notice that it would be very easy to expand this to process other kinds of headers. You would only need to write another process_* function, and modify header_func to return process_* when the line indicates to do so.


Edit: I removed the use of izip(*[iterator]*2) since it assumes the first line is a header line. The first line could be blank or a non-header line, which would throw everything off. I replaced it with some if-statements. It's not quite as succinct, but the result is a bit more robust.

like image 190
unutbu Avatar answered Oct 21 '22 04:10

unutbu


How about splitting out the logic for parsing the different header's types of data into separate functions, then using a dictionary to map from the given header to the right one:

def parse_data_a(iterator):
    next(iterator) # throw away the blank line after the header
    for line in iterator:
        if not line.strip():
            break  # bale out if we find a blank line, another header is about to start
        # do stuff with each line here

# define similar functions to parse other blocks of data, e.g. parse_data_b()

# define a mapping from header strings to the functions that parse the following data
parser_for_header = {"Header_A": parse_data_a} # put other parsers in here too!

def parse(lines):
    iterator = iter(lines)
    for line in iterator:
        header = line.strip()
        if header in parser_for_header:
            parser_for_header[header](iterator)

This code uses iteration, rather than indexing to handle the lines. An advantage of this is that you can run it directly on a file in addition to on a list of lines, since files are iterable. It also makes the bounds checking very easy, since a for loop will end automatically when there's nothing left in the iterable, as well as when a break statement is hit.

Depending on what you're doing with the data you're parsing, you may need to have the individual parsers return something, rather than just going off and doing their own thing. In that case, you'll need some logic in the top-level parse function to get the results and assemble it into some useful format. Perhaps a dictionary would make the most sense, with the last line becoming:

results_dict[header] = parser_for_header[header](iterator)
like image 20
Blckknght Avatar answered Oct 21 '22 02:10

Blckknght


You can do it with the send function of generators as well :)

data_lines = [
    'Header_A   ',
    '',
    '',
    '1 1.02345',
    '2 2.97959',
    '',
]

def process_header_a(line):
    while True:
        line = yield line
        # process line
        print 'A', line

header_processors = {
    'Header_A': process_header_a(None),
}

current_processer = None
for line in data_lines:
    line = line.strip()
    if line in header_processors:
        current_processor = header_processors[line]
        current_processor.send(None)
    elif line:
        current_processor.send(line)    

for processor in header_processors.values():
    processor.close()

You can remove all if conditions from the main loop if you replace

current_processer = None
for line in data_lines:
    line = line.strip()
    if line in header_processors:
        current_processor = header_processors[line]
        current_processor.send(None)
    elif line:
        current_processor.send(line)    

with

map(next, header_processors.values())
current_processor = header_processors['Header_A']
for line in data_lines:
    line = line.strip()
    current_processor = header_processors.get(line, current_processor)
    line and line not in header_processors and current_processor.send(line)
like image 2
Umair Khan Avatar answered Oct 21 '22 03:10

Umair Khan