Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Serializing Java objects to Cassandra 1.2 via ByteBuffer & CQL 3

I've cobbled together the below code that doesn't do anything complex -- just creates a byte[] variable, writes it into a blob field in Cassandra (v1.2, via the new Datastax CQL library), then reads it back out again.

When I put it in it's 3 elements long, and when I read it back it's 84 elements long...! This means the thing I'm actually trying to do (serialize Java objects) fails with an org.apache.commons.lang.SerializationException: java.io.StreamCorruptedException: invalid stream header: 81000008 error when trying to deserialize again.

Here's some sample code that demonstrates my problem:

import java.nio.ByteBuffer;

import org.apache.commons.lang.SerializationUtils;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;

public class TestCassandraSerialization {


    private Cluster cluster;
    private Session session;

    public TestCassandraSerialization(String node) {
        connect(node);
    }

    private void connect(String node) {
        cluster = Cluster.builder().addContactPoint(node).build();
        Metadata metadata = cluster.getMetadata();
        System.out.printf("Connected to %s\n", metadata.getClusterName());
        for (Host host: metadata.getAllHosts()) {
              System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n",
                         host.getDatacenter(), host.getAddress(), host.getRack());
        }
        session = cluster.connect();
    }

    public void setUp() {
        session.execute("CREATE KEYSPACE test_serialization WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};");

        session.execute("CREATE TABLE test_serialization.test_table (id text PRIMARY KEY, data blob)");
    }

    public void tearDown() {
        session.execute("DROP KEYSPACE test_serialization");
    }

    public void insertIntoTable(String key, byte[] data) {
        PreparedStatement statement = session.prepare("INSERT INTO test_serialization.test_table (id,data) VALUES (?, ?)");
        BoundStatement boundStatement = new BoundStatement(statement);
        session.execute(boundStatement.bind(key,ByteBuffer.wrap(data)));
    }

    public byte[] readFromTable(String key) {
        String q1 = "SELECT * FROM test_serialization.test_table WHERE id = '"+key+"';";

        ResultSet results = session.execute(q1);
        for (Row row : results) {
            ByteBuffer data = row.getBytes("data");
            return data.array();
        }
        return null;
    }


    public static boolean compareByteArrays(byte[] one, byte[] two) {
        if (one.length > two.length) {
            byte[] foo = one;
            one = two;
            two = foo;
        }

        // so now two is definitely the longer array    
        for (int i=0; i<one.length; i++) {
            //System.out.printf("%d: %s\t%s\n", i, one[i], two[i]);
            if (one[i] != two[i]) {
                return false;
            }
        }
        return true;
    }


    public static void main(String[] args) {
        TestCassandraSerialization tester = new TestCassandraSerialization("localhost");

        try {
            tester.setUp();
            byte[] dataIn = new byte[]{1,2,3};
            tester.insertIntoTable("123", dataIn);
            byte[] dataOut = tester.readFromTable("123");

            System.out.println(dataIn);
            System.out.println(dataOut);

            System.out.println(dataIn.length); // prints "3"
            System.out.println(dataOut.length); // prints "84"

            System.out.println(compareByteArrays(dataIn, dataOut)); // prints false         

            String toSave = "Hello, world!";
            dataIn = SerializationUtils.serialize(toSave);
            tester.insertIntoTable("toSave", dataIn);
            dataOut = tester.readFromTable("toSave");

            System.out.println(dataIn.length); // prints "20"
            System.out.println(dataOut.length); // prints "104"


            // The below throws org.apache.commons.lang.SerializationException: java.io.StreamCorruptedException: invalid stream header: 81000008
            String hasLoaded = (String) SerializationUtils.deserialize(dataOut); 
            System.out.println(hasLoaded);

        } finally {
            tester.tearDown();
        }
    }
}

It looks like the right stuff makes it into the database:

cqlsh:flight_cache> select * from test_serialization.test_table;

 id     | data
--------+--------------------------------------------
    123 |                                   0x010203
 toSave | 0xaced000574000d48656c6c6f2c20776f726c6421

cqlsh:flight_cache> 

So it looks like an error when reading, rather than writing, the binary data. Can anyone give me any pointers as to what I'm doing wrong?

like image 660
Richard Gaywood Avatar asked Jun 24 '13 18:06

Richard Gaywood


People also ask

What is serialization in Java?

Java - Serialization. Java provides a mechanism, called object serialization where an object can be represented as a sequence of bytes that includes the object's data as well as information about the object's type and the types of data stored in the object. After a serialized object has been written into a file, it can be read from the file and ...

How to connect to Cassandra from Java?

In order to connect to Cassandra from Java, we need to build a Cluster object. An address of a node needs to be provided as a contact point. If we don't provide a port number, the default port (9042) will be used. These settings allow the driver to discover the current topology of a cluster. 3.3. Creating the Keyspace

How do I implement a custom data type in Cassandra?

For example, custom data types can be implemented by extending the AbstractType class. The Cassandra Java Driver makes it easy to access Cassandra from Java applications. Cassandra also features significant Java-based configuration and monitoring and can even be customized with Java.

What programming language is Cassandra written in?

It is worth noting that Cassandra is written in Java. The advantage of this for Java developers is that many of Cassandra's configuration values are JVM options that Java developers are already familiar with.


1 Answers

The problem is almost certainly because the array returned by ByteBuffer.array() is the full backing array, but the data may only be contained within a portion of it.

The valid data that is being returned starts at ByteBuffer.arrayOffset() and is of length ByteBuffer.remaining(). To get a byte array containing just the valid data use this code in readFromTable:

byte[] result = new byte[data.remaining()];
data.get(result);

then your data is in result and you can return that.

like image 173
Richard Avatar answered Nov 10 '22 01:11

Richard