Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Stream Bytes chunks to csv rows in python

I need to process a large remote CSV line by line without downloading it entirely.

Below is the closest I got. I iterate byte chunks from Azure, and have some code to handle truncated lines. But this cannot work if csv values contain a newline as I am not able to discernate between value newlines and csv newlines.

# this does not work
def azure_iter_lines(logger_scope, client, file_path):
    # get a StorageStreamDownloader
    # https://docs.microsoft.com/en-us/python/api/azure-storage-file-datalake/azure.storage.filedatalake.storagestreamdownloader?view=azure-python
    file_client = client.get_file_client(file_path)
    file_handle = file_client.download_file()

    truncated_line = ''
    for chunk in file_handle.chunks():
        # have the previous truncated line appended to the next block
        chunk_txt = truncated_line + chunk.decode("utf-8")
        lines = chunk_txt.split('\n') # THIS CANNOT WORK AS VALUES CONTAIN NEWLINES
        for line in lines[0:len(lines)-2]:
            yield line
        truncated_line = lines[len(lines)-1]

    # process the last chunk (same code)
    chunk_txt = truncated_line
    lines = chunk_txt.split('\n') # THIS CANNOT WORK AS VALUES CONTAIN NEWLINES
    for line in lines[0:len(lines)-2]:
        yield line
    truncated_line = lines[len(lines)-1]

Ideally I would use csv.DictReader() but I was not able to to so as it downloads the file entirely.

# this does not work
def azure_iter_lines(logger_scope, client, file_path):
    file_client = client.get_file_client(file_path)
    file_handle = file_client.download_file()
    buffer = io.BytesIO()
    file_handle.readinto(buffer) # THIS DOWNLOADS THE FILE ENTIRELY
    csvreader = csv.DictReader(buffer, delimiter=";")
    return csvreader

Here is an update using some hints by @H.Leger

Please note that this still does not work

file_client = client.get_file_client(file_path)
file_handle = file_client.download_file()
stream = codecs.iterdecode(file_handle.chunks(), 'utf-8')
csvreader = csv.DictReader(stream, delimiter=";")
for row in csvreader:
    print(row)
# => _csv.Error: new-line character seen in unquoted field - do you need to open the file in universal-newline mode?

EDIT: Final solution based on @paiv answer

EDIT: Updated solution to use io instead of codecs for faster parsing

import io
import csv
import ctypes as ct

# bytes chunk iterator to python stream adapter 
# https://stackoverflow.com/a/67547597/2523414

class ChunksAdapter:
    def __init__(self, chunks):
        self.chunks = chunks
        self.buf = b''
        self.closed = False
    
    def readable(self):
        return True
        
    def writable(self):
        return False
    
    def seekable(self):
        return False
        
    def close(self):
        self.closed = True
        
    def read(self, size):
        if not self.buf:
            self.buf = next(self.chunks, b'')
        res, self.buf = self.buf[:size], self.buf[size:]
        return res



# get the downloader object
file_client = client.get_file_client(file_path)
downloader = file_client.download_file()
# adapt the downloader iterator to a byte stream
file_object = ChunksAdapter(downloader.chunks())
# decode bytes stream to utf-8
text_stream = io.TextIOWrapper(file_object, encoding='utf-8', newline='') 

# update csv field limit to handle large fields
# https://stackoverflow.com/a/54517228/2523414
csv.field_size_limit(int(ct.c_ulong(-1).value // 2)) 

csvreader = csv.DictReader(text_stream, delimiter=";", quotechar='"', quoting=csv.QUOTE_MINIMAL)
for row in csvreader:
    print(row)
like image 990
Clément Prévost Avatar asked May 12 '21 16:05

Clément Prévost


2 Answers

Disclaimer: I know little Azure specifics. Ultimately, you would want to stream separate chunks too.

In Python, given a file object, you can set up CSV streaming this way:

import codecs
import csv
codec = codecs.getreader('utf-8')
text_stream = codec(file_object)
csvreader = csv.DictReader(text_stream)

Now you can iterate over csvreader, and it will read from file_object in a streaming fasion.

Edit: as @Martijn Pieters suggested, we can gain performance with TextIOWrapper instead of codecs:

text_stream = io.TextIOWrapper(file_object, encoding='utf-8', newline='')

Check the comment in csv module on newline parameter.

But Azure's StorageStreamDownloader does not provide python's file object interface. It has .chunks() generator (which I assume will invoke separate HTTP request to retrieve next chunk).

You can adapt .chunks() into a file object with a simple adapter:

class ChunksAdapter:
    def __init__(self, chunks):
        self.chunks = chunks
        self.buf = b''
        
    def read(self, size):
        if not self.buf:
            self.buf = next(self.chunks, b'')
        res, self.buf = self.buf[:size], self.buf[size:]
        return res

And use like

downloader = file_client.download_file()
file_object = ChunksAdapter(downloader.chunks())

Be sure to configure DictReader for the appropriate CSV dialect.

And set appropriate values for max_single_get_size, max_chunk_get_size on the blob client.

like image 135
paiv Avatar answered Sep 16 '22 12:09

paiv


I believe the requests package can be useful for you. Using the stream option while getting your file and the Response.iter_lines() function should do what you need :

import codecs
import csv
import requests

url = "https://navitia.opendatasoft.com//explore/dataset/all-datasets/download?format=csv"
r = requests.get(url, stream=True)  # using the stream option to avoid loading everything

try:
    buffer = r.iter_lines()  # iter_lines() will feed you the distant file line by line
    reader = csv.DictReader(codecs.iterdecode(buffer, 'utf-8'), delimiter=';')
    for row in reader:
        print(row)  # Do stuff here
finally:
    r.close()
like image 29
H.Leger Avatar answered Sep 19 '22 12:09

H.Leger