I am trying to copy over S3 large dataset (larger than RAM) using SQL alchemy. My constraints are:
I just want to pipe data from a DB to S3 in a memory efficient way.
I can to do it normal with data sets (using below logic) but with larger dataset I hit a buffer issue.
The first problem I solved is that executing a query usually buffers the result in memory. I use the fetchmany() method.
engine = sqlalchemy.create_engine(db_url)
engine.execution_options(stream_results=True)
results=engine.execute('SELECT * FROM tableX;')
while True:
  chunk = result.fetchmany(10000)
  if not chunk:
    break
On the other side, I have a StringIO buffer that I feed with the fetchmany data check. Then I send its content to s3.
from io import StringIO
import boto3
import csv
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')
csv_writer.writerows(chunk)
s3_resource.Object(bucket, s3_key).put(Body=csv_buffer.getvalue())
The problem I have is essentially a design issue, how do I make these parts work together. Is it even possible in the same runtime?
engine = sqlalchemy.create_engine(db_url)
s3_resource = boto3.resource('s3')
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=';')
engine.execution_options(stream_results=True)
results=engine.execute('SELECT * FROM tableX;')
while True:
    chunk = result.fetchmany(10000)
    csv_writer = csv.writer(csv_buffer, delimiter=';')
    csv_writer.writerows(chunk)
    s3_resource.Object(bucket, s3_key).put(Body=csv_buffer.getvalue())
    if not chunk:
        break
I can make it work for one cycle of fetchmany, but not several. Any idea?
I'm assuming that by "make these parts work together" you mean you want a single file in S3 instead of just parts? All you need to do is to create a file object that, when read, will issue a query for the next batch and buffer that. We can make use of python's generators:
def _generate_chunks(engine):
    with engine.begin() as conn:
        conn = conn.execution_options(stream_results=True)
        results = conn.execute("")
        while True:
            chunk = results.fetchmany(10000)
            if not chunk:
                break
            csv_buffer = StringIO()
            csv_writer = csv.writer(csv_buffer, delimiter=';')
            csv_writer.writerows(chunk)
            yield csv_buffer.getvalue().encode("utf-8")
This is a stream of chunks of your file, so all we need to do is to stitch these together (lazily, of course) into a file object:
class CombinedFile(io.RawIOBase):
    def __init__(self, strings):
        self._buffer = ""
        self._strings = iter(strings)
    def read(self, size=-1):
        if size < 0:
            return self.readall()
        if not self._buffer:
            try:
                self._buffer = next(self._strings)
            except StopIteration:
                pass
        if len(self._buffer) > size:
            ret, self._buffer = self._buffer[:size], self._buffer[size:]
        else:
            ret, self._buffer = self._buffer, b""
        return ret
chunks = _generate_chunks(engine)
file = CombinedFile(chunks)
upload_file_object_to_s3(file)
Streaming the file object to S3 is left as an exercise for the reader. (You can probably use put_object.)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With