I'm trying to stream a binary file from a client request to Google Cloud Storage through my server.
I'm using the Tornado framework to stream the data from the request to the server and Google Cloud Storage API for streaming the file to Google -upload_from_file
method.
I'm new to Tornado and I'm using the @stream_request_body
decorator so I can get the data from the request in chunks and upload each chunk to Google.
I've tried to open a file-like object to which I can write each chunk while uploading the 'file' itself to Google.
The problem is that I can't upload the 'file' to Google before I start writing chunks to it.
Any assistance would be much appreciated.
Doing this with Google's HTTP libraries is tricky because they're designed for synchronous use. You need to put the actual upload on another thread to avoid blocking the IOLoop. You can use an os.pipe
to communicate between the Tornado thread and the upload thread (wrap the writing end of the pipe in a PipeIOStream and the reading end in os.fdopen
). Here's an untested sketch of a solution:
def prepare(self):
r, w = os.pipe()
self.write_pipe = tornado.iostream.PipeIOStream(w)
# Create our "file-like object" for upload_from_file
self.read_pipe = os.fdopen(r)
# Create an event for the upload thread to communicate back
# to tornado when it's done, and save a reference to our IOLoop.
self.upload_done = tornado.locks.Event()
self.io_loop = tornado.ioloop.IOLoop.current()
# Consider using a tornado.locks.Semaphore to limit the number of
# threads you can create.
self.thread = threading.Thread(target=self.upload_file)
self.thread.start()
def upload_file(self):
google_client.upload_from_file(self.read_pipe)
# tell the IOLoop thread we're finished
self.io_loop.add_callback(self.upload_done.set)
async def data_received(self, chunk):
await self.write_pipe.write(chunk)
async def put(self): # or post()
self.write_pipe.close()
await self.upload_done.wait()
self.thread.join()
self.render("upload_done.html")
Alternately, you could avoid google's synchronous libraries and do everything using the underlying HTTP APIs and AsyncHTTPClient. Sorting out authentication this way is tricky but you avoid the threading mismatch. This would involve using a body_producer as in this gist
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With