Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Streaming a binary file to Google Storage using Tornado

I'm trying to stream a binary file from a client request to Google Cloud Storage through my server.

I'm using the Tornado framework to stream the data from the request to the server and Google Cloud Storage API for streaming the file to Google -upload_from_file method.

I'm new to Tornado and I'm using the @stream_request_body decorator so I can get the data from the request in chunks and upload each chunk to Google.

I've tried to open a file-like object to which I can write each chunk while uploading the 'file' itself to Google.

The problem is that I can't upload the 'file' to Google before I start writing chunks to it.

Any assistance would be much appreciated.

like image 521
Liad Amsalem Avatar asked Oct 28 '22 21:10

Liad Amsalem


1 Answers

Doing this with Google's HTTP libraries is tricky because they're designed for synchronous use. You need to put the actual upload on another thread to avoid blocking the IOLoop. You can use an os.pipe to communicate between the Tornado thread and the upload thread (wrap the writing end of the pipe in a PipeIOStream and the reading end in os.fdopen). Here's an untested sketch of a solution:

def prepare(self):
    r, w = os.pipe()
    self.write_pipe = tornado.iostream.PipeIOStream(w)
    # Create our "file-like object" for upload_from_file
    self.read_pipe = os.fdopen(r)
    # Create an event for the upload thread to communicate back
    # to tornado when it's done, and save a reference to our IOLoop.
    self.upload_done = tornado.locks.Event()
    self.io_loop = tornado.ioloop.IOLoop.current()
    # Consider using a tornado.locks.Semaphore to limit the number of
    # threads you can create.
    self.thread = threading.Thread(target=self.upload_file)
    self.thread.start()

def upload_file(self):
    google_client.upload_from_file(self.read_pipe)
    # tell the IOLoop thread we're finished
    self.io_loop.add_callback(self.upload_done.set)

async def data_received(self, chunk):
    await self.write_pipe.write(chunk)

async def put(self):  # or post()
    self.write_pipe.close()
    await self.upload_done.wait()
    self.thread.join()
    self.render("upload_done.html")

Alternately, you could avoid google's synchronous libraries and do everything using the underlying HTTP APIs and AsyncHTTPClient. Sorting out authentication this way is tricky but you avoid the threading mismatch. This would involve using a body_producer as in this gist

like image 194
Ben Darnell Avatar answered Nov 03 '22 04:11

Ben Darnell