I am able to generate and stream text on the fly, but unable to generate and stream a compressed file on the fly.
from flask import Flask, request, Response,stream_with_context
import zlib
import gzip
app = Flask(__name__)
def generate_text():
for x in range(10000):
yield f"this is my line: {x}\n".encode()
@app.route('/stream_text')
def stream_text():
response = Response(stream_with_context(generate_text()))
return response
def generate_zip():
for x in range(10000):
yield zlib.compress(f"this is my line: {x}\n".encode())
@app.route('/stream_zip')
def stream_zip():
response = Response(stream_with_context(generate_zip()), mimetype='application/zip')
response.headers['Content-Disposition'] = 'attachment; filename=data.gz'
return response
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8000, debug=True)
Than using curl and gunzip:
curl http://127.0.0.1:8000/stream_zip > data.gz
gunzip data.gz
gunzip: data.gz: not in gzip format
I don't care if it is zip, gzip, or any other type of compression.
generate_text
in my real code generates over 4 GB of data so I would like to compress on the fly.
Saving text to file, zipping, returning zip file, and than deleting is not the solution I'm after.
I need to be in a loop generating some text -> compress that text -> streaming compressed data until I'm done.
zip/gzip ... anything is fine as long as it works.
You are yielding a series of compressed documents, not a single compressed stream. Don't use zlib.compress()
, it includes the header and forms a single document.
You need to create a zlib.compressobj()
object instead, and use the Compress.compress()
method on that object to produce a stream of data (followed by a final call to Compress.flush()
):
def generate_zip():
compressor = zlib.compressobj()
for x in range(10000):
chunk = compressor.compress(f"this is my line: {x}\n".encode())
if chunk:
yield chunk
yield compressor.flush()
The compressor can produce empty blocks when there is not enough data yet to produce a full compressed-data chunk, the above only yields if there is actually anything to send. Because your input data is so highly repetitive and thus the data can be efficiently compressed, this yields only 3 times (once with 2-byte header, once with about 21kb of compressed data covering the first 8288 iterations over range()
, and finally with the remaining 4kb for the rest of the loop).
In aggregate, this produces the same data as a single zlib.compress()
call with all inputs concatenated. The correct mime-type for this data format is application/zlib
, not application/zip
.
This format is not readily decompressible with gzip
however, not without some trickery. That's because the above doesn't yet produce a GZIP file, it just produces a raw zlib-compressed stream. To make it GZIP compatible, you need to configure the compression correctly, send a header first, and add a CRC checksum and data length value at the end:
import zlib
import struct
import time
def generate_gzip():
# Yield a gzip file header first.
yield bytes([
0x1F, 0x8B, 0x08, 0x00, # Gzip file, deflate, no filename
*struct.pack('<L', int(time.time())), # compression start time
0x02, 0xFF, # maximum compression, no OS specified
])
# bookkeeping: the compression state, running CRC and total length
compressor = zlib.compressobj(
9, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0)
crc = zlib.crc32(b"")
length = 0
for x in range(10000):
data = f"this is my line: {x}\n".encode()
chunk = compressor.compress(data)
if chunk:
yield chunk
crc = zlib.crc32(data, crc) & 0xFFFFFFFF
length += len(data)
# Finishing off, send remainder of the compressed data, and CRC and length
yield compressor.flush()
yield struct.pack("<2L", crc, length & 0xFFFFFFFF)
Serve this as application/gzip
:
@app.route('/stream_gzip')
def stream_gzip():
response = Response(stream_with_context(generate_gzip()), mimetype='application/gzip')
response.headers['Content-Disposition'] = 'attachment; filename=data.gz'
return response
and the result can be decompressed on the fly:
curl http://127.0.0.1:8000/stream_gzip | gunzip -c | less
While I was extremely impressed by Martijn's solution, I decided to roll my own one that uses pigz
for better performance:
def yield_pigz(results, compresslevel=1):
cmd = ['pigz', '-%d' % compresslevel]
pigz_proc = subprocess.Popen(cmd, bufsize=0,
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
def f():
for result in results:
pigz_proc.stdin.write(result)
pigz_proc.stdin.flush()
pigz_proc.stdin.close()
try:
t = threading.Thread(target=f)
t.start()
while True:
buf = pigz_proc.stdout.read(4096)
if len(buf) == 0:
break
yield buf
finally:
t.join()
pigz_proc.wait()
Keep in mind that you'll need to import subprocess
and threading
for this to work. You will also need to install pigz
program (already in repositories of most Linux distributions -- on Ubuntu, just use sudo apt install pigz -y
).
Example usage:
from flask import Flask, Response
import subprocess
import threading
import random
app = Flask(__name__)
def yield_something_random():
for i in range(10000):
seq = [chr(random.randint(ord('A'), ord('Z'))) for c in range(1000)]
yield ''.join(seq)
@app.route('/')
def index():
return Response(yield_pigz(yield_something_random()))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With