How to create a Thread safe InputStream .During multithreaded operations the inputStream data gets corrupted,so how can i make my inputStream thread safe.will the following code work
public class SynchronizedInputStream extends InputStream{
private InputStream in;
private SynchronizedInputStream( InputStream in ) {
this.in = in;
}
/* ... method for every InputStream type to use */
public static InputStream createInputStream( InputStream in) {
return new SynchronizedInputStream( in);
}
public static InputStream createPushBackInputStream(InputStream in,int BUFSIZE){
return new SynchronizedInputStream(new PushbackInputStream(in,BUFSIZE));
}
/* Wrap all InputStream methods Used */
public int read(){
synchronized (this) {
try {
return in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
return 0;
}
@Override
public int available() {
synchronized( this ) {
try {
return in.available();
} catch (IOException e) {
e.printStackTrace();
}
}
return 0;
}
}
In the NANOHTTPD file
public HTTPSession(TempFileManager tempFileManager, InputStream inputStream, OutputStream outputStream, InetAddress inetAddress) {
this.inputStream=(PushbackInputStream) SynchronizedInputStream.createPushBackInputStream(inputStream);
/*lines of code..........*/
}
Then i call it like this
String Data = readStream(session.getInputStream());//session is HTTPSession
/*.....code....*/
private String readStream(InputStream in) {
synchronized (in) {
PushbackInputStream inputStream = (PushbackInputStream) in;
StringBuffer outputBuffer = null;
try {
//Reading the InputStream Here
}
} catch (IOException ioe) {
//error msg
}
return outputBuffer.toString();
}
}
Note: The ObservableInputStream is not thread safe, as instances of InputStream usually aren't. If you must access the stream from multiple threads, then synchronization, locking, or a similar means must be used.
A DataOutputStream is not safe for use by multiple concurrent threads. If a DataOutputStream is to be used by more than one thread then access to the data output stream should be controlled by appropriate synchronization.
In general you should assume that a class is not thread safe unless it specifically says that it is. GZipInputStream is no exception to this rule.
Writing to a socket by multiple threads is thread-safe as long as the other end can make sense of the intereleaved data. Reading from a socket by multiple threads is thread-safe as long as this end can make sense of the interleaved data.
The short answer is that the class you have shown us is thread-safe, but the code that uses your class probably won't be thread safe!!
What you have implemented are operations that atomically read one character, and atomically test if there is something to be read. The implementation of those operations is thread-safe, if (and only if) all threads use the same SynchronizedInputStream
object to access a given InputStream
, and nothing apart from your wrapper access the InputStream
directly.
However, this most likely that this will not be sufficient to make your application's use of the streams thread-safe in the larger sense.
I expect that the "corruption" that you are observing is actually happening a higher level; e.g. two threads that are simultaneously making a sequence of read
calls to read (say) messages are interleaving so that some bytes of a message are going to the wrong thread. Assuming that that is your problem, then this does not fix it. Your read
method only locks the stream while a thread reads a single byte. After unlocking, there is nothing to stop a different thread from reading the next byte.
There are a few ways to solve this. For example"
A simple way is to restructure your code only one thread ever reads from a given InputStream
. That thread reads the messages, and turns them into objects that can be handed off to others via a queue ... for example.
Another way is to replace your wrapper class with one that reads an entire message atomically. Don't extend InputStream
. Instead design your API in terms of the larger scale operations, and synchronize at that level of granularity.
UPDATE
Re the extra code you added.
It looks like only one thread (the current request thread) should ever be reading from the input stream. If you are only using one thread there should be no issues with multi-threading or thread safety. (And besides, that this the way that the nanoHTTPD code is designed to work.)
Supposing that there were multiple threads, your synchronized (in) {
block in readStream
would normally be sufficient to make the code thread-safe, provided that all all of the threads were using the same in
object.
The problem is that your hacked HttpSession
class is creating a separate SynchronizedInputStream
for each "session", and THAT is what your code synchronizes on. So if (somehow) two threads created HttpSessions
objects using the same socket input stream, they would synchronize on different objects, and there would be no mutual exclusion.
But this is all conjecture. So far, you have not demonstrated that there are multiple threads attempting to use the same input stream.
You need to think about how it would make sense. Imagine more than one people are reading a magic book, which erases the character the first time anyone sees it. So only one person can read any given character. That's kind of how streams are.
This makes it really hard to read the book in a useful manner. When most naively done, each person will just get some random subset of the characters; not very useful information.
One straight forward solution is to let one read it and then copy it onto a book that doesn't erase characters when one reads it. This way everyone can read the book. In some situation, you don't need everyone to understand the book, and people can just work as long as they are given a sentence. In this case, the one reader can post each sentence to a queue from which everyone takes one sentence at a time.
Other approaches include having a buffer where each threads store the character they read, and then check each time if they can form a word, and if so emitting the word for downstream processing. For an example, see Netty's codec package (e.g. this).
These approaches are however usually implemented on top of a stream rather than inside it. You could well have a stream that does these inside, but it will probably confuse people.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With