Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java library to write binary format for Postgres COPY?

Has anyone come across a Java library (or just some code) to write the binary format used by Postgres' COPY command?

It looks very simple, but if someone's already figured out the correct tuple data format, I'd just as well start there.

Actually, even just the description of the formats for all data types would be helpful.

Thanks.

like image 675
Dmitri Avatar asked Jan 09 '13 16:01

Dmitri


2 Answers

You could try PgBulkInsert, which implements the Binary Copy Protocol of PostgreSQL:

  • https://github.com/bytefish/PgBulkInsert

It is also available from the Maven Central Repository.

Disclaimer: I am the project author.

PostgreSQL Binary Copy Protocol

I don't want to simply advertise my project, but also write about the protocol.

First of all I have written a class PgBinaryWriter, which wraps a DataOutputStream and has methods for writing the Binary Protocol Header, a method to start a new row (the Binary Copy Protocol requires you to write the number of columns for each row you are going to insert) and a write method, which takes an IValueHandler<TTargetType> for writing a given Java type.

The PgBinaryWriter implements an AutoClosable, because it is necessary to write a -1 to the stream before flushing and closing the stream.

The IValueHandler<TTargetType> takes a DataOutputStream and a value. It is responsible for writing the given value with the PostgreSQL Binary Protocol Format.

PgBinaryWriter

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql;


import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;
import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers.IValueHandler;

import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.OutputStream;

public class PgBinaryWriter implements AutoCloseable {

    /** The ByteBuffer to write the output. */
    private transient DataOutputStream buffer;

    public PgBinaryWriter() {
    }

    public void open(final OutputStream out) {
        buffer = new DataOutputStream(new BufferedOutputStream(out));

        writeHeader();
    }

    private void writeHeader() {
        try {

            // 11 bytes required header
            buffer.writeBytes("PGCOPY\n\377\r\n\0");
            // 32 bit integer indicating no OID
            buffer.writeInt(0);
            // 32 bit header extension area length
            buffer.writeInt(0);

        } catch(Exception e) {
            throw new BinaryWriteFailedException(e);
        }
    }

    public void startRow(int numColumns) {
        try {
            buffer.writeShort(numColumns);
        } catch(Exception e) {
            throw new BinaryWriteFailedException(e);
        }
    }

    public <TTargetType> void write(final IValueHandler<TTargetType> handler, final TTargetType value) {
        handler.handle(buffer, value);
    }

    @Override
    public void close() {
        try {
            buffer.writeShort(-1);

            buffer.flush();
            buffer.close();
        } catch(Exception e) {
            throw new BinaryWriteFailedException(e);
        }
    }
}

ValueHandler

An IValueHandler is a simple interface, which has a handle method to take the DataOutputStream and a value.

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;

import java.io.DataOutputStream;
import java.lang.reflect.Type;

public interface IValueHandler<TTargetType> extends ValueHandler {

    void handle(DataOutputStream buffer, final TTargetType value);

    Type getTargetType();

}

It's important to know about the protocol, that you have to write a -1 when a value is null. For this I have written an abstract base class, that handles the case.

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;

import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;

import java.io.DataOutputStream;

public abstract class BaseValueHandler<T> implements IValueHandler<T> {

    @Override
    public void handle(DataOutputStream buffer, final T value) {
        try {
            if (value == null) {
                buffer.writeInt(-1);
                return;
            }
            internalHandle(buffer, value);
        } catch (Exception e) {
            throw new BinaryWriteFailedException(e);
        }
    }

    protected abstract void internalHandle(DataOutputStream buffer, final T value) throws Exception;
}

Then the handlers for the various Java types can implemented. Here is the example for long. You can find the other implementations in the GitHub repository (handlers).

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;

import java.io.DataOutputStream;
import java.lang.reflect.Type;

public class LongValueHandler extends BaseValueHandler<Long> {

    @Override
    protected void internalHandle(DataOutputStream buffer, final Long value) throws Exception {
        buffer.writeInt(8);
        buffer.writeLong(value);
    }

    @Override
    public Type getTargetType() {
        return Long.class;
    }
}

Using the PgBinaryWriter

Now it finally comes to connecting the parts. Please note, that I have abstracted some more parts. It might be necessary to lookup more implementation details in the code.

public abstract class PgBulkInsert<TEntity> {

    // ... 

    public void saveAll(PGConnection connection, Stream<TEntity> entities) throws SQLException {

        CopyManager cpManager = connection.getCopyAPI();
        CopyIn copyIn = cpManager.copyIn(getCopyCommand());

        int columnCount = columns.size();

        try (PgBinaryWriter bw = new PgBinaryWriter()) {

            // Wrap the CopyOutputStream in our own Writer:
            bw.open(new PGCopyOutputStream(copyIn));

            // Insert all entities:                
            entities.forEach(entity -> {

                // Start a New Row:
                bw.startRow(columnCount);
                
                // Insert the Column Data:
                columns.forEach(column -> {
                    try {
                        column.getWrite().invoke(bw, entity);
                    } catch (Exception e) {
                        throw new SaveEntityFailedException(e);
                    }
                });
            });
        }
    }
    
    private String getCopyCommand()
    {
        String commaSeparatedColumns = columns.stream()
                .map(x -> x.columnName)
                .collect(Collectors.joining(", "));

        return String.format("COPY %1$s(%2$s) FROM STDIN BINARY",
                table.GetFullQualifiedTableName(),
                commaSeparatedColumns);
    }
}

PgBulkInsert

PgBulkInsert supports the following PostgreSQL data types.

  • Numeric Types
    • smallint
    • integer
    • bigint
    • real
    • double precision
  • Date/Time Types
    • timestamp
    • date
  • Character Types
    • text
  • Boolean Type
    • boolean
  • Binary Data Types
    • bytea
  • Network Address Types
    • inet (IPv4, IPv6)
  • UUID Type
    • uuid

Basic Usage

Imagine a large amount of persons should be bulk inserted into a PostgreSQL database. Each Person has a first name, a last name and a birthdate.

Database Table

The table in the PostgreSQL database might look like this:

CREATE TABLE sample.unit_test
(
    first_name text,
    last_name text,
    birth_date date
);

Domain Model

The domain model in the application might look like this:

private class Person {

    private String firstName;

    private String lastName;

    private LocalDate birthDate;

    public Person() {}

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public LocalDate getBirthDate() {
        return birthDate;
    }

    public void setBirthDate(LocalDate birthDate) {
        this.birthDate = birthDate;
    }
    
}

Bulk Inserter

Then you have to implement the PgBulkInsert<Person>, which defines the mapping between the table and the domain model.

public class PersonBulkInserter extends PgBulkInsert<Person>
{
    public PersonBulkInserter() {
        super("sample", "unit_test");

        MapString("first_name", Person::getFirstName);
        MapString("last_name", Person::getLastName);
        MapDate("birth_date", Person::getBirthDate);
    }
}

Using the Bulk Inserter

And finally we can write a Unit Test to insert 100000 Persons into the database. You can find the entire Unit Test on GitHub: IntegrationTest.java.

@Test
public void bulkInsertPersonDataTest() throws SQLException {
    // Create a large list of Persons:
    List<Person> persons = getPersonList(100000);
    
    // Create the BulkInserter:
    PersonBulkInserter personBulkInserter = new PersonBulkInserter();
    
    // Now save all entities of a given stream:
    personBulkInserter.saveAll(PostgreSqlUtils.getPGConnection(connection), persons.stream());
    
    // And assert all have been written to the database:
    Assert.assertEquals(100000, getRowCount());
}

private List<Person> getPersonList(int numPersons) {
    List<Person> persons = new ArrayList<>();

    for (int pos = 0; pos < numPersons; pos++) {
        Person p = new Person();

        p.setFirstName("Philipp");
        p.setLastName("Wagner");
        p.setBirthDate(LocalDate.of(1986, 5, 12));

        persons.add(p);
    }

    return persons;
}
like image 107
bytefish Avatar answered Nov 09 '22 20:11

bytefish


Have you considered just using the CopyManager from the JDBC driver? Otherwise, you can probably derive the implementation from the QueryExecutorImpl.

like image 28
ig0774 Avatar answered Nov 09 '22 20:11

ig0774