Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

HTTP continuous packeted stream with Indy

I have a JSON-RPC service which for one of the requests returns a continuous stream of JSON objects.

I.e. :

{id:'1'}
{id:'2'}
//30 minutes of no data
{id:'3'}
//...

Of course, there's no Content-Length because the stream is endless.

I'm using custom TStream descendant to receive and parse the data. But internally TIdHttp buffers the data and does not pass it to me until RecvBufferSize bytes are received.

This results in:

{id:'1'} //received
{id:'2'} //buffered by Indy but not received
//30 minutes of no data
{id:'3'} //this is where Indy commits {id:'2'} to me

Obviously this won't do because the message which mattered 30 minutes ago should have been delivered 30 minutes ago.

I'd like Indy to do just what sockets do: read up to RecvBufferSize or less if there's data available and return immediately.

I've found this discussion from 2005 where some poor soul tried to explain the problem to Indy developers but they didn't understand him. (Read it; it's a sad sight)

Anyway, he worked around this by writing custom IOHandler descendant, but that was back in 2005, maybe there are some ready solutions today?

like image 866
himself Avatar asked Mar 25 '13 12:03

himself


3 Answers

Sounds to me like a WebSocket task, since your connection is not plain HTTP question/answer oriented any more, but a stream of content.

See WebSocket server implementations for Delphi for some code.

There is at least one based on Indy, from the author of AsmProfiler.

AFAIK there are two kind of stream in websockets: binary and text. I suspect your JSON stream is some text content, from the websocket point of view.

Another option is to use long-pooling or some older protocols, which are more rooter-friendly - when the connection switch to websockets mode, it is no standard HTTP any more, so some "sensible" packet-inspection tools (on a corporate network) may identify it as a security attack (e.g. DoS), so may stop the connection.

like image 71
Arnaud Bouchez Avatar answered Nov 07 '22 09:11

Arnaud Bouchez


You do not need to write a IOHandler descendant, it is already possible with the TIdTCPClient class. It exposes a TIdIOHandler object, which has methods to read from the socket. These ReadXXX methods block until the requested data has been read or a timeout occurs. As long as the connection exists, ReadXXX can be executed in a loop and whenever it receives a new JSON object, pass it to the application logic.

Your example looks like all JSON objects only have one line. JSON objects however could be multi-lined, in this case the client code needs to know how they are separated.


Update: in a similar Stackoverflow question (for .Net) for a 'streaming' HTTP JSON web service, the most upvoted solution used a lower-level TCP client instead of a HTTP client: Reading data from an open HTTP stream

like image 41
mjn Avatar answered Nov 07 '22 09:11

mjn


While using TCP stream was an option, in the end I went with original solution of writing custom TIdIOHandlerStack descendant.

The motivation was that with TIdHTTP I know what doesn't work and only need to fix that, while switching to lower level TCP means new problems can arise.

Here's the code that I'm using, and I'm going to discuss the key points here.

New TIdStreamIoHandler has to inherit from TIdIOHandlerStack.

Two functions need to be rewritten: ReadBytes and ReadStream:

function TryReadBytes(var VBuffer: TIdBytes; AByteCount: Integer;
  AAppend: Boolean = True): integer; virtual;
procedure ReadStream(AStream: TStream; AByteCount: TIdStreamSize = -1;
  AReadUntilDisconnect: Boolean = False); override;

Both are modified Indy functions which can be found in IdIOHandler.TIdIOHandler. In ReadBytes the while clause has to be replaced with a singe ReadFromSource() request, so that TryReadBytes returns after reading up to AByteCount bytes in one go.

Based on this, ReadStream has to handle all combinations of AByteCount (>0, <0) and ReadUntilDisconnect (true, false) to cyclically read and then write to stream chunks of data arriving from the socket.

Note that ReadStream need not terminate prematurely even in this stream version if only part of the requested data is available in the socket. It just has to write that part to the stream instantly instead of caching it in FInputBuffer, then block and wait for the next part of data.

like image 26
himself Avatar answered Nov 07 '22 10:11

himself