How to convert Avro GenericRecord to Json using while coverting timestamp fields from milliseconds to datetime?
Currently using Avro 1.8.2
Timestamp tsp = new Timestamp(1530228588182l);
Schema schema = SchemaBuilder.builder()
.record("hello")
.fields()
.name("tsp").type(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))).noDefault()
.endRecord();
System.out.println(schema.toString());
GenericRecord genericRecord = new GenericData.Record(schema);
genericRecord.put("tsp",tsp.getTime()); //Assume I cannot change this
System.out.println(genericRecord.toString());
I tried using the function below but the result is same as genericrecord.toString()
public static String toJsonString(Schema schema, GenericRecord genericRecord) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema);
writer.getData().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, baos, false);
writer.write(genericRecord, encoder);
encoder.flush();
return baos.toString();
}
Third Attempt
public static GenericRecord deserialize(final Schema schema, byte[] data) throws IOException {
final GenericData genericData = new GenericData(){
@Override
public String toString(Object datum) {
StringBuilder buffer = new StringBuilder();
// Since these types are not quoted and produce a malformed JSON string, quote it here.
if (datum instanceof java.sql.Timestamp || datum instanceof java.sql.Time || datum instanceof java.sql.Date) {
return buffer.append("\"").append(datum).append("\"").toString();
}
return super.toString(datum);
}
};
genericData.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
genericData.addLogicalTypeConversion(new TimeConversions.TimeConversion());
try (final InputStream is = new ByteArrayInputStream(data)) {
final Decoder decoder = DecoderFactory.get().binaryDecoder(is, null);
final DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema, schema, genericData);
return reader.read(null, decoder);
}
}
Schema
{"type":"record","name":"tsp_name","fields":[{"name":"tsp","type":{"type":"long","logicalType":"timestamp-millis"}}]}
Current Output
{"tsp":2018-06-28T23:29:48.182Z} // missing quotes so not a valid json
Expected Output
{"tsp": "2018-06-28T23:29:48.182Z"}
A generic instance of a record schema. Fields are accessible by name as well as by index.
Avro schema definitions are JSON records. Because it is a record, it can define multiple fields which are organized in a JSON array. Each such field identifies the field's name as well as its type. The type can be something simple, like an integer, or something complex, like another record.
To change the projection you can extend the conversion to return a string for timestamp-millis logical type. The following code result in your expected output
import org.apache.avro.*;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.*;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Timestamp;
public class Main5 {
public static void main(String [] args ) throws IOException {
Timestamp tsp = new Timestamp(1530228588182L);
String strSchema = "{\"type\":\"record\",\"name\":\"tsp_name\",\"fields\":[{\"name\":\"tsp\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}}]}\n";
Schema schema = new Schema.Parser().parse(strSchema);
System.out.println(new DateTime(tsp.getTime(), DateTimeZone.UTC));
GenericRecord genericRecord = new GenericData.Record(schema);
genericRecord.put("tsp",tsp.getTime()); //Assume I cannot change this
System.out.println(genericRecord);
System.out.println(deserialize(schema, toByteArray(schema , genericRecord)));
}
public static byte [] toByteArray(Schema schema, GenericRecord genericRecord) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema);
writer.getData().addLogicalTypeConversion(new TimeConversions.TimestampConversion());
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
writer.write(genericRecord, encoder);
encoder.flush();
return baos.toByteArray();
}
public static GenericRecord deserialize(Schema schema, byte[] data) throws IOException {
final GenericData genericData = new GenericData();
genericData.addLogicalTypeConversion(new MyTimestampConversion());
InputStream is = new ByteArrayInputStream(data);
Decoder decoder = DecoderFactory.get().binaryDecoder(is, null);
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema, schema, genericData);
return reader.read(null, decoder);
}
public static class MyTimestampConversion extends Conversion<String> {
public MyTimestampConversion() {
}
public Class<String> getConvertedType() {
return String.class;
}
public String getLogicalTypeName() {
return "timestamp-millis";
}
public String fromLong(Long millisFromEpoch, Schema schema, LogicalType type) {
return (new DateTime(millisFromEpoch, DateTimeZone.UTC)).toString();
}
public Long toLong(String timestamp, Schema schema, LogicalType type) {
return new Long(timestamp);
}
}
}
Output {"tsp": "2018-06-28T23:29:48.182Z"}
`
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With