Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Twisted big files transfer

I write client-server application like this: client(c#) <-> server (twisted; ftp proxy and additional functional) <-> ftp server

Server has two classes: my own class-protocol inherited from LineReceiever protocol and FTPClient from twisted.protocols.ftp.

But when client sends or gets big files (10 Gb - 20 Gb) server catches MemoryError. I don't use any buffers in my code. It happens when after call transport.write(data) data appends to inner buffer of reactor's writers (correct me if I wrong).

What should I use to avoid this problem? Or should I change approach to the problem?

I found out that for big streams, I should use IConsumer and IProducer interfaces. But finally it will invoke transfer.write method and effect will be the same. Or am I wrong?

UPD:

Here is logic of file download/upload (from ftp through Twisted server to client on Windows):

Client sends some headers to Twisted server and after that begins send of file. Twisted server receive headers and after that (if it needs) invoke setRawMode(), open ftp connection and recieves/sends bytes from/to client and after all close connections. Here is a part of code that uploads files:

FTPManager class

def _ftpCWDSuccees(self, protocol, fileName):
        self._ftpClientAsync.retrieveFile(fileName, FileReceiver(protocol))



class FileReceiver(Protocol):
    def __init__(self, proto):
        self.__proto = proto

    def dataReceived(self, data):
        self.__proto.transport.write(data)

    def connectionLost(self, why = connectionDone):
        self.__proto.connectionLost(why)

main proxy-server class:

class SSDMProtocol(LineReceiver)
...

After SSDMProtocol object (call obSSDMProtocol) parse headers it invoke method that open ftp connection (FTPClient from twisted.protocols.ftp) and set object of FTPManager field _ftpClientAsync and call _ftpCWDSuccees(self, protocol, fileName) with protocol = obSSDMProtocol and when file's bytes recieved invokes dataReceived(self, data) of FileReceiver object.

And when self.__proto.transport.write(data) invoked, data appends to inner buffer faster than sending back to client, therefore memory runs out. May be I can stop reading when the buffer reaches a certain size and resume reading after buffer will be all send to client? or something like that?

like image 529
Denis Nikanorov Avatar asked Dec 09 '22 20:12

Denis Nikanorov


1 Answers

If you're passing a 20 gigabyte (gigabit?) string to transport.write, you're going to need at least 20 gigabytes (gigabits?) of memory - probably more like 40 or 60 due to the extra copying necessary when dealing with strings in Python.

Even if you never pass a single string to transport.write that is 20 gigabytes (gigabits?), if you repeatedly call transport.write with short strings at a rate faster than your network can handle, the send buffer will eventually grow too large to fit in memory and you'll encounter a MemoryError.

The solution to both of these problems is the producer/consumer system. The advantage that using IProducer and IConsumer gives you is that you'll never have a 20 gigabyte (gigabit?) string and you'll never fill up a send buffer with too many shorter strings. The network will be throttled so that bytes are not read faster than your application can deal with them and forget about them. Your strings will end up on the order of 16kB - 64kB, which should easily fit in memory.

You just need to adjust your use of FileReceiver to include registration of the incoming connection as a producer for the outgoing connection:

class FileReceiver(Protocol):
    def __init__(self, outgoing):
        self._outgoing = outgoing

    def connectionMade(self):
        self._outgoing.transport.registerProducer(self.transport, streaming=True)

    def dataReceived(self, data):
        self._outgoing.transport.write(data)

Now whenever self._outgoing.transport's send buffer fills up, it will tell self.transport to pause. Once the send buffer empties out, it will tell self.transport to resume. self.transport nows how to undertake these actions at the TCP level so that data coming into your server will also be slowed down.

like image 50
Jean-Paul Calderone Avatar answered Jan 17 '23 16:01

Jean-Paul Calderone