Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Concurrent processing of single InputStream with independent consumers

I need to spawn N consumer threads, which process same InputStream concurrently, e.g - transform it somehow, calculate checksum or digital signature etc. These consumers do not depend on each other and all of them are using third-party libraries, which accept InputStream as source of data.

So what I can do is - create some implementation of InputStream, which will

  • read chunk of data from "parent" stream
  • unblock consumers
  • wait until every consumer read the whole chunk
  • read next chunk

while being looking simple, it may rise various problems like livelock when certain consumer dies, implement all of InputStream methods, control fork/join of consumers themselves using barriers/latches etc.

One buddy told me that it's half of an hour to implement, it made my evening.

I'd prefer either use something mature enough (googling didn't come with results thus, my google-fu isn't good enough?) or don't bother and copy entire "source" stream into a temporary file and use it as source of data. The latter solution seems to be more reliable, but may end up in creation of gigabyte files (when processing streaming audio for example).

like image 340
jdevelop Avatar asked Jul 04 '12 20:07


1 Answers

The way I see it, you should have at least some kind of buffering so different consumers can move through the stream at different pace without everything being constantly bogged down by the currently slowest consumer. That basically ensures worst-case performance and very little benefit of concurrency.

You could, say, tag each chunk with the consumers that have used it so far and then delete those that are completely used up. Maybe this could be achieved by each consumer holding a reference to each chunk it hasn't yet used, which would allow GC to automatically take care of used chunks. The producer might keep a list of WeakReferences to the chunks so it has a handle on the number of chunks yet to be used and base its throttling on that.

I am also thinking about having a separate InputStream instance per thread, which internally communicates with the producer InputStream. This way you have an easy solution for your livelock hazard: try ... finally { is.close(); } -- the dying consumer closes its own inputstream. This is communicated to the producer.

I have some ideas with using an ArrayBlockingQueue per consumer. There would be some difficulty in ensuring that all consumers are properly fed, without making the producer either block or busy-wait.

like image 168
Marko Topolnik Avatar answered Oct 13 '22 18:10

Marko Topolnik