I am trying to read and process a large file in chunks with Python. I am following this blog that proposes a very fast way of reading and processing large chunks of data spread over multiple processes. I have only slightly updated the existing code, i.e. using stat(fin).st_size
over os.path.getsize
. In the example I also haven't implemented multiprocessing, as the issue also manifests itself in a single process. That makes it easier to debug.
The issue that I am having with this code, is that it returns broken sentences. This makes sense: the pointers do not take line endings into account, and just return some given byte size. In practice, one would assume that you could solve this by leaving out the last item in the fetched batch of lines, as that would most probably be the broken line. Unfortunately that does not work reliably either.
from os import stat
def chunkify(pfin, buf_size=1024):
file_end = stat(pfin).st_size
with open(pfin, 'rb') as f:
chunk_end = f.tell()
while True:
chunk_start = chunk_end
f.seek(buf_size, 1)
f.readline()
chunk_end = f.tell()
yield chunk_start, chunk_end - chunk_start
if chunk_end > file_end:
break
def process_batch(pfin, chunk_start, chunk_size):
with open(pfin, 'r', encoding='utf-8') as f:
f.seek(chunk_start)
batch = f.read(chunk_size).splitlines()
# changing this to batch[:-1] will result in 26 lines total
return batch
if __name__ == '__main__':
fin = r'data/tiny.txt'
lines_n = 0
for start, size in chunkify(fin):
lines = process_batch(fin, start, size)
# Uncomment to see broken lines
# for line in lines:
# print(line)
# print('\n')
lines_n += len(lines)
print(lines_n)
# 29
The code above will print 29
as the total of processed lines. When you do not return the last item of the batch, naively assuming that that is a broken line anyway, you'll get 26
. The actual number of lines is 27. The testing data can be found below.
She returned bearing mixed lessons from a society where the tools of democracy still worked.
If you think you can sense a "but" approaching, you are right.
Elsewhere, Germany take on Brazil and Argentina face Spain, possibly without Lionel Messi.
What sort of things do YOU remember best?'
Less than three weeks after taking over from Lotz at Wolfsburg.
The buildings include the Dr. John Micallef Memorial Library.
For women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for breast cancer.
In one interview he claimed it was from the name of the Cornish language ("Kernewek").
8 Goldschmidt was out of office between 16 and 19 July 1970.
Last year a new law allowed police to shut any bar based on security concerns.
But, Frum explains: "Glenn Beck takes it into his head that this guy is bad news."
Carrying on the Romantic tradition of landscape painting.
This area has miles of undeveloped beach adjacent to the headlands.
The EAC was created in 2002 to help avoid a repeat of the disputed 2000 presidential election.
In May 1945, remnants of the German Army continue fight on in the Harz mountains, nicknamed "The Void" by American troops.
Dietler also said Abu El Haj was being opposed because she is of Palestinian descent.
The auction highlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disorder.
GAAP operating profit was $13.2 million and $7.1 million in the second quarter of 2008 and 2007, respectively.
Doc, Ira, and Rene are sent home as part of the seventh bond tour.
only I am sick of always hearing him called the Just.
Also there is Meghna River in the west of Brahmanbaria.
The explosives were the equivalent of more than three kilograms of dynamite - equal to 30 grenades," explained security advisor Markiyan Lubkivsky to reporters gathered for a news conference in Kyiv.
Her mother first took her daughter swimming at the age of three to help her with her cerebal palsy.
A U.S. aircraft carrier, the USS "Ticonderoga", was also stationed nearby.
Louis shocked fans when he unexpectedly confirmed he was expecting a child in summer 2015.
99, pp.
Sep 19: Eibar (h) WON 6-1
If you print out the created lines, you'll see that, indeed, broken sentences occur. I find this odd. Should't f.readline()
ensure that the file is read until the next line? In the output below, the empty line separates two batches. That means that you cannot check a line with the next line in a batch, and remove it if it's a substring - the broken sentence belongs to another batch than the full sentence.
...
This area has miles of undeveloped beach adjacent to the headlands.
The EAC was created in 2002 to help avoid a repeat of the disputed 2000 presidential election.
In May 1945, r
In May 1945, remnants of the German Army continue fight on in the Harz mountains, nicknamed "The Void" by American troops.
...
Is there a way to get rid of these broken sentences, without removing too much?
You can download a larger test file (100,000 lines) here.
After a lot of digging, it seems that actually some inaccessible buffer is responsible for the inconsistent behaviour of seek, as discussed here and here. I tried out the proposed solution to use iter(f.readline, '')
with seek
but that still gives me inconsistent results. I have updated my code to return the file pointer after each batch of 1500 lines, but in reality the batches return will overlap.
from os import stat
from functools import partial
def chunkify(pfin, max_lines=1500):
file_end = stat(pfin).st_size
with open(pfin, 'r', encoding='utf-8') as f:
chunk_end = f.tell()
for idx, l in enumerate(iter(f.readline, '')):
if idx % max_lines == 0:
chunk_start = chunk_end
chunk_end = f.tell()
# yield start position, size, and is_last
yield chunk_start, chunk_end - chunk_start
chunk_start = chunk_end
yield chunk_start, file_end
def process_batch(pfin, chunk_start, chunk_size):
with open(pfin, 'r', encoding='utf-8') as f:
f.seek(chunk_start)
chunk = f.read(chunk_size).splitlines()
batch = list(filter(None, chunk))
return batch
if __name__ == '__main__':
fin = r'data/100000-ep+gutenberg+news+wiki.txt'
process_func = partial(process_batch, fin)
lines_n = 0
prev_last = ''
for start, size in chunkify(fin):
lines = process_func(start, size)
if not lines:
continue
# print first and last ten sentences of batch
for line in lines[:10]:
print(line)
print('...')
for line in lines[-10:]:
print(line)
print('\n')
lines_n += len(lines)
print(lines_n)
An example of overlapping batches is below. The first two and a half sentence of the last batch are duplicated from the last sentences of the batch before. I don't know how to explain nor solve this.
...
The EC ordered the SFA to conduct probes by June 30 and to have them confirmed by a certifying authority or it would deduct a part of the funding or the entire sum from upcoming EU subsidy payments.
Dinner for two, with wine, 250 lari.
It lies a few kilometres north of the slightly higher Weissmies and also close to the slightly lower Fletschhorn on the north.
For the rest we reached agreement and it was never by chance.
Chicago Blackhawks defeat Columbus Blue Jackets for 50th win
The only drawback in a personality that large is that no one els
For the rest we reached agreement and it was never by chance.
Chicago Blackhawks defeat Columbus Blue Jackets for 50th win
The only drawback in a personality that large is that no one else, whatever their insights or artistic pedigree, is quite as interesting.
Sajid Nadiadwala's reboot version of his cult classic "Judwaa", once again directed by David Dhawan titled "Judwaa 2" broke the dry spell running at the box office in 2017.
They warned that there will be a breaking point, although it is not clear what that would be.
...
In addition to this, I have also tried removing the readline
from the original code, and keeping track of a remaining, incomplete chunk. The incomplete chunk is then passed to the next chunk and added to its front. The issue that I am running into now, is that because the text is read in byte chunks, it can happen that a chunk ends without completely finishing a character's bytes. This wille lead to decoding errors.
from os import stat
def chunkify(pfin, buf_size=1024):
file_end = stat(pfin).st_size
with open(pfin, 'rb') as f:
chunk_end = f.tell()
while True:
chunk_start = chunk_end
f.seek(buf_size, 1)
chunk_end = f.tell()
is_last = chunk_end >= file_end
# yield start position, size, and is_last
yield chunk_start, chunk_end - chunk_start, is_last
if is_last:
break
def process_batch(pfin, chunk_start, chunk_size, is_last, leftover):
with open(pfin, 'r', encoding='utf-8') as f:
f.seek(chunk_start)
chunk = f.read(chunk_size)
# Add previous leftover to current chunk
chunk = leftover + chunk
batch = chunk.splitlines()
batch = list(filter(None, batch))
# If this chunk is not the last one,
# pop the last item as that will be an incomplete sentence
# We return this leftover to use in the next chunk
if not is_last:
leftover = batch.pop(-1)
return batch, leftover
if __name__ == '__main__':
fin = r'ep+gutenberg+news+wiki.txt'
lines_n = 0
left = ''
for start, size, last in chunkify(fin):
lines, left = process_batch(fin, start, size, last, left)
if not lines:
continue
for line in lines:
print(line)
print('\n')
numberlines = len(lines)
lines_n += numberlines
print(lines_n)
Running the code above, will inevitably result in a UnicodeDecodeError
.
Traceback (most recent call last):
File "chunk_tester.py", line 46, in <module>
lines, left = process_batch(fin, start, size, last, left)
File "chunk_tester.py", line 24, in process_batch
chunk = f.read(chunk_size)
File "lib\codecs.py", line 322, in decode
(result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xa9 in position 0: invalid start byte
You can use following methods to read both unicode and binary file. Loop over each chunk of the file. 1. In Python 3.8+, there is a new Walrus Operator :=, allows you to read a file in chunks in while loop. 2. Below Python 3.8, you can create a lazy methond to read big file piece by piece, using yield. 3.
So overall when we break the file into chunks, we want to make sure data is not truncated. The discussed approach is very useful while dealing with large-size of files ( .TXT or .CSV or .XLSX) like in GB or TB.
Read large file without loading it into memory, line by line. It only reads one line at a time. When the next line is read, the previous one will be garbage collected unless you have stored a reference to it somewhere else. You can use following methods to read both unicode and binary file. Loop over each chunk of the file. 1.
The splitting by chunks algorithm is based on the number of bytes/characters. So you might notice that some chunks process a lot of short lines whereas other chunks process a single long line. I think this approach fits perfectly with the problem we are trying to solve.
You were so close! A relatively simple change to your final code (reading in the data as bytes
and not str
) makes it all (almost) work.
The main issue was because reading from binary files counts bytes, but reading from text files counts text, and you did your first counting in bytes and your second in characters, leading to your assumptions about what data had already been read to be wrong. It's nothing about an internal, hidden buffer.
Other changes:
b'\n'
instead of using bytes.splitlines()
, and only remove blank lines after the relevant detection code.chunkify
can be replaced by a simpler, faster loop that's functionally identical without having to keep the file open.This gives the final code:
from os import stat
def chunkify(pfin, buf_size=1024**2):
file_end = stat(pfin).st_size
i = -buf_size
for i in range(0, file_end - buf_size, buf_size):
yield i, buf_size, False
leftover = file_end % buf_size
if leftover == 0: # if the last section is buf_size in size
leftover = buf_size
yield i + buf_size, leftover, True
def process_batch(pfin, chunk_start, chunk_size, is_last, leftover):
with open(pfin, 'rb') as f:
f.seek(chunk_start)
chunk = f.read(chunk_size)
# Add previous leftover to current chunk
chunk = leftover + chunk
batch = chunk.split(b'\n')
# If this chunk is not the last one,
# pop the last item as that will be an incomplete sentence
# We return this leftover to use in the next chunk
if not is_last:
leftover = batch.pop(-1)
return [s.decode('utf-8') for s in filter(None, batch)], leftover
if __name__ == '__main__':
fin = r'ep+gutenberg+news+wiki.txt'
lines_n = 0
left = b''
for start, size, last in chunkify(fin):
lines, left = process_batch(fin, start, size, last, left)
if not lines:
continue
for line in lines:
print(line)
print('\n')
numberlines = len(lines)
lines_n += numberlines
print(lines_n)
You have an interesting problem here. You have n
processes that are given each the location of a chunk of data to process, but you can't provide the exact location of the chunks because you are dealing with lines and your locations are in bytes. Even if you split the file in lines to get the precise locations of chunks, you are experiencing some issues.
Here's a solution that is suboptimal (I assume that you do not want to process lines sequentially: it seems obvious):
B\nM\nA
where B
(before) and A
(after) do not contains any line feed, but M
may contain line feeds;M
and put B\nA
in a list at the current chunk index;B\nA
elements.This is suboptimal because once you have processed every M
, you still have to process all B\nA
and that last work must wait the other processes to be complete.
Here's the code:
def chunkify(file_end, buf_size=1024):
"""Yield chunks of `buf_size` bytes"""
for chunk_start in range(0, file_end, buf_size):
yield chunk_start, min(buf_size, file_end - chunk_start)
def process_batch(remainders, i, f, chunk_start, chunk_size):
"""Process a chunk"""
f.seek(chunk_start)
chunk = f.read(chunk_size)
chunk, remainders[i] = normalize(chunk)
# process chunk here if chunk is not None
return chunk
def normalize(chunk):
"""Return `M, B\\nA`
The chunk format is `B\\nM\\nA` where `B` (before) and `A` (after) do not contains any line feed,
but `M` may contain line feeds"""
i = chunk.find(b"\n")
j = chunk.rfind(b"\n")
if i == -1 or i == j:
return None, chunk
else:
return chunk[i+1:j], chunk[:i]+chunk[j:]
Note that if the chunk has no middle (M
part), then we return None
as chunk and everything is sent to remainders
.
Some tests:
text = """She returned bearing mixed lessons from a society where the tools of democracy still worked.
If you think you can sense a "but" approaching, you are right.
Elsewhere, Germany take on Brazil and Argentina face Spain, possibly without Lionel Messi.
What sort of things do YOU remember best?'
Less than three weeks after taking over from Lotz at Wolfsburg.
The buildings include the Dr. John Micallef Memorial Library.
For women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for breast cancer.
In one interview he claimed it was from the name of the Cornish language ("Kernewek").
8 Goldschmidt was out of office between 16 and 19 July 1970.
Last year a new law allowed police to shut any bar based on security concerns.
But, Frum explains: "Glenn Beck takes it into his head that this guy is bad news."
Carrying on the Romantic tradition of landscape painting.
This area has miles of undeveloped beach adjacent to the headlands.
The EAC was created in 2002 to help avoid a repeat of the disputed 2000 presidential election.
In May 1945, remnants of the German Army continue fight on in the Harz mountains, nicknamed "The Void" by American troops.
Dietler also said Abu El Haj was being opposed because she is of Palestinian descent.
The auction highlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disorder.
GAAP operating profit was $13.2 million and $7.1 million in the second quarter of 2008 and 2007, respectively.
Doc, Ira, and Rene are sent home as part of the seventh bond tour.
only I am sick of always hearing him called the Just.
Also there is Meghna River in the west of Brahmanbaria.
The explosives were the equivalent of more than three kilograms of dynamite - equal to 30 grenades," explained security advisor Markiyan Lubkivsky to reporters gathered for a news conference in Kyiv.
Her mother first took her daughter swimming at the age of three to help her with her cerebal palsy.
A U.S. aircraft carrier, the USS "Ticonderoga", was also stationed nearby.
Louis shocked fans when he unexpectedly confirmed he was expecting a child in summer 2015.
99, pp.
Sep 19: Eibar (h) WON 6-1"""
import io, os
def get_line_count(chunk):
return 0 if chunk is None else len(chunk.split(b"\n"))
def process(f, buf_size):
f.seek(0, os.SEEK_END)
file_end = f.tell()
remainders = [b""]*(file_end//buf_size + 1)
L = 0
for i, (start, n) in enumerate(chunkify(file_end, buf_size)):
chunk = process_batch(remainders, i, f, start, n)
L += get_line_count(chunk)
print("first pass: lines processed", L)
print("remainders", remainders)
last_chunk = b"".join(remainders)
print("size of last chunk {} bytes, {} lines".format(len(last_chunk), get_line_count(last_chunk)))
L += get_line_count(last_chunk)
print("second pass: lines processed", L)
process(io.BytesIO(bytes(text, "utf-8")), 256)
process(io.BytesIO(bytes(text, "utf-8")), 512)
with open("/home/jferard/prog/stackoverlfow/ep+gutenberg+news+wiki.txt", 'rb') as f:
process(f, 4096)
with open("/home/jferard/prog/stackoverlfow/ep+gutenberg+news+wiki.txt", 'rb') as f:
process(f, 16384)
Output:
first pass: lines processed 18
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked.\nWhat sort', b" of things do YOU remember best?'\nFor women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for br", b'east cancer.\nBut, Frum explai', b'ns: "Glenn Beck takes it into his head that this guy is bad news."\nThe EAC was created in 2002 to help avoid a repeat of the dispu', b'ted 2000 presidential election.\nThe auction hig', b"hlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disor", b'der.\nAlso there is Meghn', b'a River in the west of Brahmanbaria.\nHer mother first to', b'ok her daughter swimming at the age of three to help her with her cerebal palsy.\nS', b'ep 19: Eibar (h) WON 6-1']
size of last chunk 880 bytes, 9 lines
second pass: lines processed 27
first pass: lines processed 21
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked.\nFor women who do not have the genes, the risk drops to just 2% for ovarian cancer and 12% for br', b'east cancer.\nThe EAC was created in 2002 to help avoid a repeat of the dispu', b"ted 2000 presidential election.\nThe auction highlights AstraZeneca's current focus on boosting returns to shareholders as it heads into a wave of patent expiries on some of its biggest selling medicines including Nexium, for heartburn and stomach ulcers, and Seroquel for schizophrenia and bipolar disor", b'der.\nHer mother first to', b'ok her daughter swimming at the age of three to help her with her cerebal palsy.\nSep 19: Eibar (h) WON 6-1']
size of last chunk 698 bytes, 6 lines
second pass: lines processed 27
first pass: lines processed 96963
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked, but where the native Dutch were often less than warm to her and her fellow exiles.\nOne of the Ffarquhar ', ..., b'the old device, Apple will give customers a gift card that can be applied toward the purchase of the new iPhone.']
size of last chunk 517905 bytes, 3037 lines
second pass: lines processed 100000
first pass: lines processed 99240
remainders [b'She returned bearing mixed lessons from a society where the tools of democracy still worked, but where the native Dutch were often less than warm to her and her fellow exiles.\nSoon Carroll was in push-up position walking her hands tow', b'ard the mirror at one side of the room while her feet were dragged along by the casual dinnerware.\nThe track "Getaway" was inspired by and allud', ..., b'the old device, Apple will give customers a gift card that can be applied toward the purchase of the new iPhone.']
size of last chunk 130259 bytes, 760 lines
second pass: lines processed 100000
the last example show you can process 99,240 out of 100,000 lines in parallel, but you have to process the last 760 lines (130kio) after all the processes are complete.
Note on concurrency: each subprocess owns a fixed cell of the remainders
list, hence there should be no memory corruption. It might be cleaner to store each remainder in its own process object (a wrapper around the real subprocess) and join all the remainders once the processes are finished.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With