Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why Postgres Replication Stream doesn't work when used in separate function?

I am working on postgres replication stream API. While working on it came across unusual behavior.

When I use replication slot writing whole code inside main block, everything works fine.

public class Server implements Config {

public static void main(String[] args) {
    Properties prop = new Properties();
    prop.load(new FileInputStream(System.getProperty("prop")));

    String user = prop.getProperty("user");
    String password = prop.getProperty("password");
    String url = prop.getProperty("url");

    Properties props = new Properties();
    PGProperty.USER.set(props, user);
    PGProperty.PASSWORD.set(props, password);
    PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
    PGProperty.REPLICATION.set(props, "database");
    PGProperty.PREFER_QUERY_MODE.set(props, "simple");

    Connection conn= null;
    PGConnection replicationConnection= null;
    PGReplicationStream stream = null;

        conn = DriverManager.getConnection(url, props);
        replicationConnection = conn.unwrap(PGConnection.class);
        stream = replicationConnection.getReplicationAPI().replicationStream().logical()
                .withSlotName("replication_slot")
                .withSlotOption("include-xids", true)
                .withSlotOption("include-timestamp", "on")
                .withSlotOption("skip-empty-xacts", true)
                .withStatusInterval(20, TimeUnit.SECONDS).start();

    while (true) {
            ByteBuffer msg;
            try {
                msg = stream.readPending();         

            if (msg == null) {
                TimeUnit.MILLISECONDS.sleep(10L);
                continue;
            }

            int offset = msg.arrayOffset();
            byte[] source = msg.array();
            int length = source.length - offset;
            // convert byte buffer into string
            String data = new String(source, offset, length);

            // then convert it into bufferedreader
            BufferedReader reader = new BufferedReader(new StringReader(data));
            String line = reader.readLine();

            while (line != null) {
                System.out.println(line);
                line = reader.readLine();
            }
            stream.setAppliedLSN(stream.getLastReceiveLSN());
            stream.setFlushedLSN(stream.getLastReceiveLSN());
        } catch (SQLException | IOException | InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        }
}

But when I try to separate stream logic using separate method like this

public class Server implements Config {

public static void main(String[] args) {
    PGReplicationStream stream = getReplicationStream();
        while (true) {
            ByteBuffer msg;
            try {
                msg = stream.readPending();
    if (msg == null) {
                TimeUnit.MILLISECONDS.sleep(10L);
                continue;
            }

            int offset = msg.arrayOffset();
            byte[] source = msg.array();
            int length = source.length - offset;
            String data = new String(source, offset, length);

            BufferedReader reader = new BufferedReader(new StringReader(data));
            String line = reader.readLine();

            while (line != null) {
                System.out.println(line);
                line = reader.readLine();
            }
            stream.setAppliedLSN(stream.getLastReceiveLSN());
            stream.setFlushedLSN(stream.getLastReceiveLSN());
        } catch (SQLException | IOException | InterruptedException e) {
            e.printStackTrace();
        }
        }

}
public static PGReplicationStream getReplicationStream() {
    Properties prop = new Properties();
    try {
        prop.load(new FileInputStream(System.getProperty("prop")));
    } catch (IOException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }

    String user = prop.getProperty("user");
    String password = prop.getProperty("password");
    String url = prop.getProperty("url");

    Properties props = new Properties();
    PGProperty.USER.set(props, user);
    PGProperty.PASSWORD.set(props, password);
    PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
    PGProperty.REPLICATION.set(props, "database");
    PGProperty.PREFER_QUERY_MODE.set(props, "simple");

    Connection conn= null;
    PGConnection replicationConnection= null;
    PGReplicationStream stream = null;
    try {
        conn = DriverManager.getConnection(url, props);
        replicationConnection = conn.unwrap(PGConnection.class);
        stream = replicationConnection.getReplicationAPI().replicationStream().logical()
                .withSlotName("replication_slot")
                .withSlotOption("include-xids", true)
                .withSlotOption("include-timestamp", "on")
                .withSlotOption("skip-empty-xacts", true)
                .withStatusInterval(20, TimeUnit.SECONDS).start();
    } catch (SQLException e) {
        e.printStackTrace();
    }
    return stream;
}

}

After reading some data, program is giving error

org.postgresql.util.PSQLException: Database connection failed when reading from copy
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1028)
at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:41)
at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:155)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:124)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:78)
at Server.main(Server.java:47)
Caused by: java.net.SocketException: Socket closed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:140)
at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:109)
at org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:191)
at org.postgresql.core.PGStream.receive(PGStream.java:495)
at org.postgresql.core.PGStream.receive(PGStream.java:479)
at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1161)
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1026)
... 5 more

I don't see any difference between both approach. But the program is behaving differently. Can someone explain, what is wrong with second approach.

like image 662
Dipesh Avatar asked Aug 08 '17 15:08

Dipesh


2 Answers

I believe your problem with with Connection. It gets out of scope once your function returns and it is upto garbage collector to collect it and finalize. in finalize, connection is closed and probably then your program fails. Try moving connection and other required intermediate variables in a scope that is available in your main method and try again.

like image 74
xycf7 Avatar answered Sep 30 '22 09:09

xycf7


There is some moderately bogus behavior with respect to closing a connection while doing a COPY. I wonder if that could be involved?

If memory serves, the issue was that a Terminate message (sent on Connection.close() to indicate that the client wants to end the protocol connection) is not synchronized relative to COPY protocol operations. This means that theoretically, your Terminate message could be shimmed right into the middle of a CopyData message. In practice, I've only ever seen an error from the server complaining about an unexpected message type (Terminate) during COPY, since the client is supposed to send CopyDone or CopyFail before a Terminate while a COPY is in effect. Still, I think it might be possible to get message contents munged.

I think your code might be susceptible to that since you kick off a bunch of COPY operations in a separate function and there might be a possibility that connection might get garbage collected.

As I see in the logs that socket connection is closed while copy protocol is still reading the data.

org.postgresql.util.PSQLException: Database connection failed when reading from copy
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1028)
at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:41)
at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:155)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:124)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:78)
at Server.main(Server.java:47)
Caused by: java.net.SocketException: Socket closed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:140)
at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:109)
like image 43
Neha Chopra Avatar answered Sep 30 '22 11:09

Neha Chopra