Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Generate and stream compressed file with Flask

Tags:

python

zip

flask

I am able to generate and stream text on the fly, but unable to generate and stream a compressed file on the fly.

from flask import Flask, request, Response,stream_with_context
import zlib
import gzip

app = Flask(__name__)

def generate_text():
    for x in range(10000):
        yield f"this is my line: {x}\n".encode()

@app.route('/stream_text')
def stream_text():
    response = Response(stream_with_context(generate_text()))
    return response

def generate_zip():
    for x in range(10000):
        yield zlib.compress(f"this is my line: {x}\n".encode())

@app.route('/stream_zip')
def stream_zip():
    response = Response(stream_with_context(generate_zip()), mimetype='application/zip')
    response.headers['Content-Disposition'] = 'attachment; filename=data.gz'
    return response

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000, debug=True)

Than using curl and gunzip:

curl http://127.0.0.1:8000/stream_zip > data.gz

gunzip data.gz
gunzip: data.gz: not in gzip format

I don't care if it is zip, gzip, or any other type of compression.

generate_text in my real code generates over 4 GB of data so I would like to compress on the fly.

Saving text to file, zipping, returning zip file, and than deleting is not the solution I'm after.

I need to be in a loop generating some text -> compress that text -> streaming compressed data until I'm done.

zip/gzip ... anything is fine as long as it works.

like image 763
mattjvincent Avatar asked May 25 '17 16:05

mattjvincent


2 Answers

You are yielding a series of compressed documents, not a single compressed stream. Don't use zlib.compress(), it includes the header and forms a single document.

You need to create a zlib.compressobj() object instead, and use the Compress.compress() method on that object to produce a stream of data (followed by a final call to Compress.flush()):

def generate_zip():
    compressor = zlib.compressobj()
    for x in range(10000):
        chunk = compressor.compress(f"this is my line: {x}\n".encode())
        if chunk:
            yield chunk
    yield compressor.flush()

The compressor can produce empty blocks when there is not enough data yet to produce a full compressed-data chunk, the above only yields if there is actually anything to send. Because your input data is so highly repetitive and thus the data can be efficiently compressed, this yields only 3 times (once with 2-byte header, once with about 21kb of compressed data covering the first 8288 iterations over range(), and finally with the remaining 4kb for the rest of the loop).

In aggregate, this produces the same data as a single zlib.compress() call with all inputs concatenated. The correct mime-type for this data format is application/zlib, not application/zip.

This format is not readily decompressible with gzip however, not without some trickery. That's because the above doesn't yet produce a GZIP file, it just produces a raw zlib-compressed stream. To make it GZIP compatible, you need to configure the compression correctly, send a header first, and add a CRC checksum and data length value at the end:

import zlib
import struct
import time

def generate_gzip():
    # Yield a gzip file header first.
    yield bytes([
        0x1F, 0x8B, 0x08, 0x00,  # Gzip file, deflate, no filename
        *struct.pack('<L', int(time.time())),  # compression start time
        0x02, 0xFF,  # maximum compression, no OS specified
    ])

    # bookkeeping: the compression state, running CRC and total length
    compressor = zlib.compressobj(
        9, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
    crc = zlib.crc32(b"")
    length = 0

    for x in range(10000):
        data = f"this is my line: {x}\n".encode()
        chunk = compressor.compress(data)
        if chunk:
            yield chunk
        crc = zlib.crc32(data, crc) & 0xFFFFFFFF
        length += len(data)

    # Finishing off, send remainder of the compressed data, and CRC and length
    yield compressor.flush()
    yield struct.pack("<2L", crc, length & 0xFFFFFFFF)

Serve this as application/gzip:

@app.route('/stream_gzip')
def stream_gzip():
    response = Response(stream_with_context(generate_gzip()), mimetype='application/gzip')
    response.headers['Content-Disposition'] = 'attachment; filename=data.gz'
    return response

and the result can be decompressed on the fly:

curl http://127.0.0.1:8000/stream_gzip | gunzip -c | less
like image 71
Martijn Pieters Avatar answered Nov 18 '22 17:11

Martijn Pieters


While I was extremely impressed by Martijn's solution, I decided to roll my own one that uses pigz for better performance:

def yield_pigz(results, compresslevel=1):
    cmd = ['pigz', '-%d' % compresslevel]
    pigz_proc = subprocess.Popen(cmd, bufsize=0,
        stdin=subprocess.PIPE, stdout=subprocess.PIPE)

    def f():
        for result in results:
            pigz_proc.stdin.write(result)
            pigz_proc.stdin.flush()
        pigz_proc.stdin.close()
    try:
        t = threading.Thread(target=f)
        t.start()
        while True:
            buf = pigz_proc.stdout.read(4096)
            if len(buf) == 0:
                break
            yield buf
    finally:
        t.join()
        pigz_proc.wait()

Keep in mind that you'll need to import subprocess and threading for this to work. You will also need to install pigz program (already in repositories of most Linux distributions -- on Ubuntu, just use sudo apt install pigz -y).

Example usage:

from flask import Flask, Response
import subprocess
import threading
import random

app = Flask(__name__)

def yield_something_random():
    for i in range(10000):
        seq = [chr(random.randint(ord('A'), ord('Z'))) for c in range(1000)]
        yield ''.join(seq)

@app.route('/')
def index():
    return Response(yield_pigz(yield_something_random()))
like image 1
d33tah Avatar answered Nov 18 '22 18:11

d33tah