I need to process a large remote CSV line by line without downloading it entirely.
Below is the closest I got. I iterate byte chunks from Azure, and have some code to handle truncated lines. But this cannot work if csv values contain a newline as I am not able to discernate between value newlines and csv newlines.
# this does not work
def azure_iter_lines(logger_scope, client, file_path):
    # get a StorageStreamDownloader
    # https://docs.microsoft.com/en-us/python/api/azure-storage-file-datalake/azure.storage.filedatalake.storagestreamdownloader?view=azure-python
    file_client = client.get_file_client(file_path)
    file_handle = file_client.download_file()
    truncated_line = ''
    for chunk in file_handle.chunks():
        # have the previous truncated line appended to the next block
        chunk_txt = truncated_line + chunk.decode("utf-8")
        lines = chunk_txt.split('\n') # THIS CANNOT WORK AS VALUES CONTAIN NEWLINES
        for line in lines[0:len(lines)-2]:
            yield line
        truncated_line = lines[len(lines)-1]
    # process the last chunk (same code)
    chunk_txt = truncated_line
    lines = chunk_txt.split('\n') # THIS CANNOT WORK AS VALUES CONTAIN NEWLINES
    for line in lines[0:len(lines)-2]:
        yield line
    truncated_line = lines[len(lines)-1]
Ideally I would use csv.DictReader() but I was not able to to so as it downloads the file entirely.
# this does not work
def azure_iter_lines(logger_scope, client, file_path):
    file_client = client.get_file_client(file_path)
    file_handle = file_client.download_file()
    buffer = io.BytesIO()
    file_handle.readinto(buffer) # THIS DOWNLOADS THE FILE ENTIRELY
    csvreader = csv.DictReader(buffer, delimiter=";")
    return csvreader
Here is an update using some hints by @H.Leger
Please note that this still does not work
file_client = client.get_file_client(file_path)
file_handle = file_client.download_file()
stream = codecs.iterdecode(file_handle.chunks(), 'utf-8')
csvreader = csv.DictReader(stream, delimiter=";")
for row in csvreader:
    print(row)
# => _csv.Error: new-line character seen in unquoted field - do you need to open the file in universal-newline mode?
EDIT: Final solution based on @paiv answer
EDIT: Updated solution to use io instead of codecs for faster parsing
import io
import csv
import ctypes as ct
# bytes chunk iterator to python stream adapter 
# https://stackoverflow.com/a/67547597/2523414
class ChunksAdapter:
    def __init__(self, chunks):
        self.chunks = chunks
        self.buf = b''
        self.closed = False
    
    def readable(self):
        return True
        
    def writable(self):
        return False
    
    def seekable(self):
        return False
        
    def close(self):
        self.closed = True
        
    def read(self, size):
        if not self.buf:
            self.buf = next(self.chunks, b'')
        res, self.buf = self.buf[:size], self.buf[size:]
        return res
# get the downloader object
file_client = client.get_file_client(file_path)
downloader = file_client.download_file()
# adapt the downloader iterator to a byte stream
file_object = ChunksAdapter(downloader.chunks())
# decode bytes stream to utf-8
text_stream = io.TextIOWrapper(file_object, encoding='utf-8', newline='') 
# update csv field limit to handle large fields
# https://stackoverflow.com/a/54517228/2523414
csv.field_size_limit(int(ct.c_ulong(-1).value // 2)) 
csvreader = csv.DictReader(text_stream, delimiter=";", quotechar='"', quoting=csv.QUOTE_MINIMAL)
for row in csvreader:
    print(row)
                Disclaimer: I know little Azure specifics. Ultimately, you would want to stream separate chunks too.
In Python, given a file object, you can set up CSV streaming this way:
import codecs
import csv
codec = codecs.getreader('utf-8')
text_stream = codec(file_object)
csvreader = csv.DictReader(text_stream)
Now you can iterate over csvreader, and it will read from file_object in a streaming fasion.
Edit: as @Martijn Pieters suggested, we can gain performance with TextIOWrapper instead of codecs:
text_stream = io.TextIOWrapper(file_object, encoding='utf-8', newline='')
Check the comment in csv module on newline parameter.
But Azure's StorageStreamDownloader does not provide python's file object interface. It has .chunks() generator (which I assume will invoke separate HTTP request to retrieve next chunk).
You can adapt .chunks() into a file object with a simple adapter:
class ChunksAdapter:
    def __init__(self, chunks):
        self.chunks = chunks
        self.buf = b''
        
    def read(self, size):
        if not self.buf:
            self.buf = next(self.chunks, b'')
        res, self.buf = self.buf[:size], self.buf[size:]
        return res
And use like
downloader = file_client.download_file()
file_object = ChunksAdapter(downloader.chunks())
Be sure to configure DictReader for the appropriate CSV dialect.
And set appropriate values for max_single_get_size, max_chunk_get_size on the blob client.
I believe the requests package can be useful for you. Using the stream option while getting your file and the Response.iter_lines() function should do what you need :
import codecs
import csv
import requests
url = "https://navitia.opendatasoft.com//explore/dataset/all-datasets/download?format=csv"
r = requests.get(url, stream=True)  # using the stream option to avoid loading everything
try:
    buffer = r.iter_lines()  # iter_lines() will feed you the distant file line by line
    reader = csv.DictReader(codecs.iterdecode(buffer, 'utf-8'), delimiter=';')
    for row in reader:
        print(row)  # Do stuff here
finally:
    r.close()
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With