Am I correct I's suppose that within the bounds of the same process having 2 threads reading/writing to a named pipe does not block reader/writer at all? So with wrong timings it's possible to miss some data?
And in case of several processes - reader will wait until some data is available, and writer will be blocked until reader will read all the data supplied by reader?
I am planning to use named pipe to pass several (tens, hundreds) of files from external process and consume ones in my Java application. Writing simple unit tests to use one thread for writing to the pipe, and another one - for reading from the pipe, resulted in sporadic test failures because of missing data chunks.
I think it's because of the threading and same process, so my test is not correct in general. Is this assumption correct?
Here is some sort of example which illustrates the case:
import java.io.{FileOutputStream, FileInputStream, File}
import java.util.concurrent.Executors
import org.apache.commons.io.IOUtils
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
class PipeTest extends FlatSpec {
def md5sum(data: Array[Byte]) = {
import java.security.MessageDigest
MessageDigest.getInstance("MD5").digest(data).map("%02x".format(_)).mkString
}
"Pipe" should "block here" in {
val pipe = new File("/tmp/mypipe")
val srcData = new File("/tmp/random.10m")
val md5 = "8e0a24d1d47264919f9d47f5223c913e"
val executor = Executors.newSingleThreadExecutor()
executor.execute(new Runnable {
def run() {
(1 to 10).foreach {
id =>
val fis = new FileInputStream(pipe)
assert(md5 === md5sum(IOUtils.toByteArray(fis)))
fis.close()
}
}
})
(1 to 10).foreach {
id =>
val is = new FileInputStream(srcData)
val os = new FileOutputStream(pipe)
IOUtils.copyLarge(is, os)
os.flush()
os.close()
is.close()
Thread.sleep(200)
}
}
}
without Thread.sleep(200) the test is failing to pass for reasons
with this delay set - it works just great. I am using file with 10 megabytes of random data.
This is a very simple race condition in your code: you're writing fixed-size messages to the pipe, and assuming that you can read the same messages back. However, you have no idea how much data is available in the pipe for any given read.
If you prefix your writes with the number of bytes written, and ensure that each read only reads that number of bytes, you'll see that pipes work exactly as advertised.
If you have a situation with multiple writers and/or multiple readers, I recommend using an actual message queue. Actually, I recommend using a message queue in any case, as it solves the issue of message boundary demarcation; there's little point in reinventing that particular wheel.
Am I correct I's suppose that within the bounds of the same process having 2 threads reading/writing to a named pipe does not block reader/writer at all?
Not unless you are using non-blocking I/O, which you aren't.
So with wrong timings it's possible to miss some data?
Not unless you are using non-blocking I/O, which you aren't.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With