Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to notify PipedInputStream thread that PipedOutputStream thread has written last byte?

How to finish the work correctly at the output end of the pipe? I need the writing thread to terminate or do some other work, while the reading thread reads all written data up to end.

Should I close the pipe at the writing end or what?

UPDATE 1

I want to clarify... According to given answers, am I correct thinking that by-design pipes behavior does not suppose any graceful termination?

I.e. once opened, the only way to stop piping is to break the pipe?

Conventional streams expect end of the stream signal, when read() method returns -1. Am right thinking that this never happens with piped streams?

like image 506
Dims Avatar asked Mar 29 '12 09:03

Dims


1 Answers

Yes, closing the PipedOutputStream results in a -1 on the PipedInputStream.

Looks pretty graceful to me! Here's my SSCCE:

import java.io.*;
import java.nio.charset.*;

public class SOPipe
{
    public static void main(String[] args) throws Exception
    {
        PipedOutputStream os = new PipedOutputStream();
        PipedInputStream is = new PipedInputStream(os);

        ReaderThread readerThread = new ReaderThread(is);
        WriterThread writerThread = new WriterThread(os);

        readerThread.start();
        writerThread.start();

        readerThread.join();
        writerThread.join();

        System.out.println("Both Reader and Writer completed.");
        System.out.println("Main method returning normally now.");
    }

    private static final Charset LATIN1 = Charset.forName("latin1");

    public static class WriterThread extends Thread
    {
        private final PipedOutputStream _os;

        public WriterThread(PipedOutputStream os)
        {
            _os = os;
        }

        public void run()
        {
            try
            {
                String msg = "Ceci n'est pas une pipe";
                byte[] msgBytes = msg.getBytes(LATIN1);
                System.out.println("WriterThread sending message: " + msg);
                for(int i = 0; i < msgBytes.length; i++)
                {
                    _os.write(msgBytes, i, 1);
                    System.out.println("WriterThread wrote a byte!");
                    _os.flush();
                }
                _os.close();
                System.out.println("[COMPLETED] WriterThread");
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        }
    }

    public static class ReaderThread extends Thread
    {
        private final PipedInputStream _is;

        public ReaderThread(PipedInputStream is)
        {
            _is = is;
        }

        public void run()
        {
            try
            {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                byte[] buffer = new byte[1];
                int read;
                while ((read = _is.read(buffer, 0, 1)) != -1)
                {
                    System.out.println("ReaderThread read a byte!");
                    baos.write(buffer, 0, read);
                }
                System.out.println("[COMPLETED] ReaderThread; received: " 
                        + new String(baos.toByteArray(), LATIN1));
                _is.close();
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        }
    }
}
like image 123
Mike Clark Avatar answered Oct 12 '22 23:10

Mike Clark