Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Named pipes in Java and multithreading

Am I correct I's suppose that within the bounds of the same process having 2 threads reading/writing to a named pipe does not block reader/writer at all? So with wrong timings it's possible to miss some data?

And in case of several processes - reader will wait until some data is available, and writer will be blocked until reader will read all the data supplied by reader?

I am planning to use named pipe to pass several (tens, hundreds) of files from external process and consume ones in my Java application. Writing simple unit tests to use one thread for writing to the pipe, and another one - for reading from the pipe, resulted in sporadic test failures because of missing data chunks.

I think it's because of the threading and same process, so my test is not correct in general. Is this assumption correct?

Here is some sort of example which illustrates the case:

import java.io.{FileOutputStream, FileInputStream, File}
import java.util.concurrent.Executors
import org.apache.commons.io.IOUtils
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner

@RunWith(classOf[JUnitRunner])
class PipeTest extends FlatSpec {

  def md5sum(data: Array[Byte]) = {
    import java.security.MessageDigest
    MessageDigest.getInstance("MD5").digest(data).map("%02x".format(_)).mkString
  }

  "Pipe" should "block here" in {
    val pipe = new File("/tmp/mypipe")
    val srcData = new File("/tmp/random.10m")
    val md5 = "8e0a24d1d47264919f9d47f5223c913e"
    val executor = Executors.newSingleThreadExecutor()
    executor.execute(new Runnable {
      def run() {
        (1 to 10).foreach {
          id =>
            val fis = new FileInputStream(pipe)
            assert(md5 === md5sum(IOUtils.toByteArray(fis)))
            fis.close()
        }
      }
    })
    (1 to 10).foreach {
      id =>
        val is = new FileInputStream(srcData)
        val os = new FileOutputStream(pipe)
        IOUtils.copyLarge(is, os)
        os.flush()
        os.close()
        is.close()
        Thread.sleep(200)
    }
  }

}

without Thread.sleep(200) the test is failing to pass for reasons

  • broken pipe exception
  • incorrect MD5 sum

with this delay set - it works just great. I am using file with 10 megabytes of random data.

like image 937
jdevelop Avatar asked May 16 '13 16:05

jdevelop


2 Answers

This is a very simple race condition in your code: you're writing fixed-size messages to the pipe, and assuming that you can read the same messages back. However, you have no idea how much data is available in the pipe for any given read.

If you prefix your writes with the number of bytes written, and ensure that each read only reads that number of bytes, you'll see that pipes work exactly as advertised.

If you have a situation with multiple writers and/or multiple readers, I recommend using an actual message queue. Actually, I recommend using a message queue in any case, as it solves the issue of message boundary demarcation; there's little point in reinventing that particular wheel.

like image 59
parsifal Avatar answered Sep 22 '22 17:09

parsifal


Am I correct I's suppose that within the bounds of the same process having 2 threads reading/writing to a named pipe does not block reader/writer at all?

Not unless you are using non-blocking I/O, which you aren't.

So with wrong timings it's possible to miss some data?

Not unless you are using non-blocking I/O, which you aren't.

like image 26
user207421 Avatar answered Sep 21 '22 17:09

user207421