I write client-server application like this: client(c#) <-> server (twisted; ftp proxy and additional functional) <-> ftp server
Server has two classes: my own class-protocol inherited from LineReceiever protocol and FTPClient from twisted.protocols.ftp.
But when client sends or gets big files (10 Gb - 20 Gb) server catches MemoryError. I don't use any buffers in my code. It happens when after call transport.write(data) data appends to inner buffer of reactor's writers (correct me if I wrong).
What should I use to avoid this problem? Or should I change approach to the problem?
I found out that for big streams, I should use IConsumer and IProducer interfaces. But finally it will invoke transfer.write method and effect will be the same. Or am I wrong?
UPD:
Here is logic of file download/upload (from ftp through Twisted server to client on Windows):
Client sends some headers to Twisted server and after that begins send of file. Twisted server receive headers and after that (if it needs) invoke setRawMode()
, open ftp connection and recieves/sends bytes from/to client and after all close connections. Here is a part of code that uploads files:
FTPManager class
def _ftpCWDSuccees(self, protocol, fileName):
self._ftpClientAsync.retrieveFile(fileName, FileReceiver(protocol))
class FileReceiver(Protocol):
def __init__(self, proto):
self.__proto = proto
def dataReceived(self, data):
self.__proto.transport.write(data)
def connectionLost(self, why = connectionDone):
self.__proto.connectionLost(why)
main proxy-server class:
class SSDMProtocol(LineReceiver)
...
After SSDMProtocol object (call obSSDMProtocol
) parse headers it invoke method that open ftp connection (FTPClient
from twisted.protocols.ftp
) and set object of FTPManager field _ftpClientAsync and call _ftpCWDSuccees(self, protocol, fileName)
with protocol = obSSDMProtocol
and when file's bytes recieved invokes dataReceived(self, data)
of FileReceiver object.
And when self.__proto.transport.write(data)
invoked, data appends to inner buffer faster than sending back to client, therefore memory runs out. May be I can stop reading when the buffer reaches a certain size and resume reading after buffer will be all send to client? or something like that?
If you're passing a 20 gigabyte (gigabit?) string to transport.write
, you're going to need at least 20 gigabytes (gigabits?) of memory - probably more like 40 or 60 due to the extra copying necessary when dealing with strings in Python.
Even if you never pass a single string to transport.write
that is 20 gigabytes (gigabits?), if you repeatedly call transport.write
with short strings at a rate faster than your network can handle, the send buffer will eventually grow too large to fit in memory and you'll encounter a MemoryError
.
The solution to both of these problems is the producer/consumer system. The advantage that using IProducer
and IConsumer
gives you is that you'll never have a 20 gigabyte (gigabit?) string and you'll never fill up a send buffer with too many shorter strings. The network will be throttled so that bytes are not read faster than your application can deal with them and forget about them. Your strings will end up on the order of 16kB - 64kB, which should easily fit in memory.
You just need to adjust your use of FileReceiver
to include registration of the incoming connection as a producer for the outgoing connection:
class FileReceiver(Protocol):
def __init__(self, outgoing):
self._outgoing = outgoing
def connectionMade(self):
self._outgoing.transport.registerProducer(self.transport, streaming=True)
def dataReceived(self, data):
self._outgoing.transport.write(data)
Now whenever self._outgoing.transport
's send buffer fills up, it will tell self.transport
to pause. Once the send buffer empties out, it will tell self.transport
to resume. self.transport
nows how to undertake these actions at the TCP level so that data coming into your server will also be slowed down.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With