I have a header and data which I need to represent in one Byte Array
. And I have a particular format for packing the header in a Byte Array
and also a different format to pack the data in a Byte Array
. After I have these two, I need to make one final Byte Array
out of it.
Below is the layout which is how defined in C++
and accordingly I have to do in Java
.
// below is my header offsets layout
// addressedCenter must be the first byte
static constexpr uint32_t addressedCenter = 0;
static constexpr uint32_t version = addressedCenter + 1;
static constexpr uint32_t numberOfRecords = version + 1;
static constexpr uint32_t bufferUsed = numberOfRecords + sizeof(uint32_t);
static constexpr uint32_t location = bufferUsed + sizeof(uint32_t);
static constexpr uint32_t locationFrom = location + sizeof(CustomerAddress);
static constexpr uint32_t locationOrigin = locationFrom + sizeof(CustomerAddress);
static constexpr uint32_t partition = locationOrigin + sizeof(CustomerAddress);
static constexpr uint32_t copy = partition + 1;
// this is the full size of the header
static constexpr uint32_t headerOffset = copy + 1;
And CustomerAddress
is a typedef for uint64_t
and it is made up like this -
typedef uint64_t CustomerAddress;
void client_data(uint8_t datacenter,
uint16_t clientId,
uint8_t dataId,
uint32_t dataCounter,
CustomerAddress& customerAddress)
{
customerAddress = (uint64_t(datacenter) << 56)
+ (uint64_t(clientId) << 40)
+ (uint64_t(dataId) << 32)
+ dataCounter;
}
And below is my data layout -
// below is my data layout -
//
// key type - 1 byte
// key len - 1 byte
// key (variable size = key_len)
// timestamp (sizeof uint64_t)
// data size (sizeof uint16_t)
// data (variable size = data size)
Problem Statement:-
Now for a part of project, I am trying to represent overall stuff in one particular class in Java so that I can just pass the necessary fields and it can make me a final Byte Array
out of it which will have the header first and then the data:
Below is my DataFrame
class:
public final class DataFrame {
private final byte addressedCenter;
private final byte version;
private final Map<byte[], byte[]> keyDataHolder;
private final long location;
private final long locationFrom;
private final long locationOrigin;
private final byte partition;
private final byte copy;
public DataFrame(byte addressedCenter, byte version,
Map<byte[], byte[]> keyDataHolder, long location, long locationFrom,
long locationOrigin, byte partition, byte copy) {
this.addressedCenter = addressedCenter;
this.version = version;
this.keyDataHolder = keyDataHolder;
this.location = location;
this.locationFrom = locationFrom;
this.locationOrigin = locationOrigin;
this.partition = partition;
this.copy = copy;
}
public byte[] serialize() {
// All of the data is embedded in a binary array with fixed maximum size 70000
ByteBuffer byteBuffer = ByteBuffer.allocate(70000);
byteBuffer.order(ByteOrder.BIG_ENDIAN);
int numOfRecords = keyDataHolder.size();
int bufferUsed = getBufferUsed(keyDataHolder); // 36 + dataSize + 1 + 1 + keyLength + 8 + 2;
// header layout
byteBuffer.put(addressedCenter); // byte
byteBuffer.put(version); // byte
byteBuffer.putInt(numOfRecords); // int
byteBuffer.putInt(bufferUsed); // int
byteBuffer.putLong(location); // long
byteBuffer.putLong(locationFrom); // long
byteBuffer.putLong(locationOrigin); // long
byteBuffer.put(partition); // byte
byteBuffer.put(copy); // byte
// now the data layout
for (Map.Entry<byte[], byte[]> entry : keyDataHolder.entrySet()) {
byte keyType = 0;
byte keyLength = (byte) entry.getKey().length;
byte[] key = entry.getKey();
byte[] data = entry.getValue();
short dataSize = (short) data.length;
ByteBuffer dataBuffer = ByteBuffer.wrap(data);
long timestamp = 0;
if (dataSize > 10) {
timestamp = dataBuffer.getLong(2);
}
byteBuffer.put(keyType);
byteBuffer.put(keyLength);
byteBuffer.put(key);
byteBuffer.putLong(timestamp);
byteBuffer.putShort(dataSize);
byteBuffer.put(data);
}
return byteBuffer.array();
}
private int getBufferUsed(final Map<byte[], byte[]> keyDataHolder) {
int size = 36;
for (Map.Entry<byte[], byte[]> entry : keyDataHolder.entrySet()) {
size += 1 + 1 + 8 + 2;
size += entry.getKey().length;
size += entry.getValue().length;
}
return size;
}
}
And below is how I am using my above DataFrame
class:
public static void main(String[] args) throws IOException {
// header layout
byte addressedCenter = 0;
byte version = 1;
long location = packCustomerAddress((byte) 12, (short) 13, (byte) 32, (int) 120);
long locationFrom = packCustomerAddress((byte) 21, (short) 23, (byte) 41, (int) 130);
long locationOrigin = packCustomerAddress((byte) 21, (short) 24, (byte) 41, (int) 140);
byte partition = 3;
byte copy = 0;
// this map will have key as the actual key and value as the actual data, both in byte array
// for now I am storing only two entries in this map
Map<byte[], byte[]> keyDataHolder = new HashMap<byte[], byte[]>();
for (int i = 1; i <= 2; i++) {
keyDataHolder.put(generateKey(), getMyData());
}
DataFrame records =
new DataFrame(addressedCenter, version, keyDataHolder, location, locationFrom,
locationOrigin, partition, copy);
// this will give me final packed byte array
// which will have header and data in it.
byte[] packedArray = records.serialize();
}
private static long packCustomerAddress(byte datacenter, short clientId, byte dataId,
int dataCounter) {
return ((long) (datacenter) << 56) | ((long) clientId << 40) | ((long) dataId << 32)
| ((long) dataCounter);
}
As you can see in my DataFrame
class, I am allocating ByteBuffer
with predefined size of 70000
. Is there a better way by which I can allocate the size I am using while making ByteBuffer
instead of using a hardcoded 70000
?
Also is there any better way as compared to what I am doing which packs my header and data in one byte array? I also need to make sure it is thread safe since it can be called by multiple threads.
By default, the order of a ByteBuffer object is BIG_ENDIAN. If a byte order is passed as a parameter to the order method, it modifies the byte order of the buffer and returns the buffer itself. The new byte order may be either LITTLE_ENDIAN or BIG_ENDIAN.
A ByteBuffer is a buffer which provides for transferring bytes from a source to a destination. In addition to storage like a buffer array, it also provides abstractions such as current position, limit, capacity, etc. A FileChannel is used for transferring data to and from a file to a ByteBuffer.
wrap. Wraps a byte array into a buffer. The new buffer will be backed by the given byte array; that is, modifications to the buffer will cause the array to be modified and vice versa. The new buffer's capacity and limit will be array.
Is there a better way by which I can allocate the size I am using while making
ByteBuffer
instead of using a hardcoded70000
?
There are at least two, non-overlapping approaches. You may use both.
One is buffer pooling. You should find out how many buffers you need during peak periods, and use a maximum above it, e.g. max + max / 2, max + average, max + mode, 2 * max.
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Consumer;
import java.util.function.Function;
public class ByteBufferPool {
private final int bufferCapacity;
private final LinkedBlockingDeque<ByteBuffer> queue;
public ByteBufferPool(int limit, int bufferCapacity) {
if (limit < 0) throw new IllegalArgumentException("limit must not be negative.");
if (bufferCapacity < 0) throw new IllegalArgumentException("bufferCapacity must not be negative.");
this.bufferCapacity = bufferCapacity;
this.queue = (limit == 0) ? null : new LinkedBlockingDeque<>(limit);
}
public ByteBuffer acquire() {
ByteBuffer buffer = (queue == null) ? null : queue.pollFirst();
if (buffer == null) {
buffer = ByteBuffer.allocate(bufferCapacity);
}
else {
buffer.clear();
buffer.order(ByteOrder.BIG_ENDIAN);
}
return buffer;
}
public boolean release(ByteBuffer buffer) {
if (buffer == null) throw new IllegalArgumentException("buffer must not be null.");
if (buffer.capacity() != bufferCapacity) throw new IllegalArgumentException("buffer has unsupported capacity.");
if (buffer.isDirect()) throw new IllegalArgumentException("buffer must not be direct.");
if (buffer.isReadOnly()) throw new IllegalArgumentException("buffer must not be read-only.");
return (queue == null) ? false : queue.offerFirst(buffer);
}
public void withBuffer(Consumer<ByteBuffer> action) {
if (action == null) throw new IllegalArgumentException("action must not be null.");
ByteBuffer buffer = acquire();
try {
action.accept(buffer);
}
finally {
release(buffer);
}
}
public <T> T withBuffer(Function<ByteBuffer, T> function) {
if (function == null) throw new IllegalArgumentException("function must not be null.");
ByteBuffer buffer = acquire();
try {
return function.apply(buffer);
}
finally {
release(buffer);
}
}
public <T> CompletionStage<T> withBufferAsync(Function<ByteBuffer, CompletionStage<T>> asyncFunction) {
if (asyncFunction == null) throw new IllegalArgumentException("asyncFunction must not be null.");
ByteBuffer buffer = acquire();
CompletionStage<T> future = null;
try {
future = asyncFunction.apply(buffer);
}
finally {
if (future == null) {
release(buffer);
}
else {
future = future.whenComplete((result, throwable) -> release(buffer));
}
}
return future;
}
}
The withBuffer
methods allow a straight forward usage of the pool, while the acquire
and release
allow separating the acquisition and releasing points.
Another one is segregating the serialization interface, e.g. the put
, putInt
and putLong
, where you can then implement a byte counting class and an actual byte buffering class. You should add a method to such interface to know if the serializer is counting bytes or buffering, in order to avoid unnecessary byte generation, and another method to increment byte usage directly, useful when calculating the size of a string in some encoding without actually serializing.
public interface ByteSerializer {
ByteSerializer put(byte value);
ByteSerializer putInt(int value);
ByteSerializer putLong(long value);
boolean isSerializing();
ByteSerializer add(int bytes);
int position();
}
public class ByteCountSerializer implements ByteSerializer {
private int count = 0;
@Override
public ByteSerializer put(byte value) {
count += 1;
return this;
}
@Override
public ByteSerializer putInt(int value) {
count += 4;
return this;
}
@Override
public ByteSerializer putLong(long value) {
count += 8;
return this;
}
@Override
public boolean isSerializing() {
return false;
}
@Override
public ByteSerializer add(int bytes) {
if (bytes < 0) throw new IllegalArgumentException("bytes must not be negative.");
count += bytes;
return this;
}
@Override
public int position() {
return count;
}
}
import java.nio.ByteBuffer;
public class ByteBufferSerializer implements ByteSerializer {
private final ByteBuffer buffer;
public ByteBufferSerializer(int bufferCapacity) {
if (bufferCapacity < 0) throw new IllegalArgumentException("bufferCapacity must not be negative.");
this.buffer = ByteBuffer.allocate(bufferCapacity);
}
@Override
public ByteSerializer put(byte value) {
buffer.put(value);
return this;
}
@Override
public ByteSerializer putInt(int value) {
buffer.putInt(value);
return this;
}
@Override
public ByteSerializer putLong(long value) {
buffer.putLong(value);
return this;
}
@Override
public boolean isSerializing() {
return true;
}
@Override
public ByteSerializer add(int bytes) {
if (bytes < 0) throw new IllegalArgumentException("bytes must not be negative.");
for (int b = 0; b < bytes; b++) {
buffer.put((byte)0);
}
return this;
// or throw new UnsupportedOperationException();
}
@Override
public int position() {
return buffer.position();
}
public ByteBuffer buffer() {
return buffer;
}
}
In your code, you'd do something along these lines (not tested):
ByteCountSerializer counter = new ByteCountSerializer();
dataFrame.serialize(counter);
ByteBufferSerializer serializer = new ByteByfferSerializer(counter.position());
dataFrame.serialize(serializer);
ByteBuffer buffer = serializer.buffer();
// ... write buffer, ?, profit ...
Your DataFrame.serialize
method should be refactored to accept a ByteSerializer
, and in cases where it would generate data, it should check isSerializing
to know if it should only calculate the size or actually write bytes.
I leave combining both approaches as an exercise, mainly because it depends a lot on how you decide to do it.
For instance, you may make ByteBufferSerializer
use the pool directly and keep an arbitrary capacity (e.g. your 70000), you may pool ByteBuffer
s by capacity (but instead of the needed capacity, use the least power of 2 greater than the needed capacity, and set the buffer's limit before returning from acquire
), or you may pool ByteBufferSerializer
s directly as long as you add a reset()
method.
Also is there any better way as compared to what I am doing which packs my header and data in one byte array?
Yes. Pass around the byte buffering instance instead of having certain methods return byte arrays which are discarded the moment after their length is checked or their contents are copied.
I also need to make sure it is thread safe since it can be called by multiple threads.
As long as each buffer is being used by only one thread, with proper synchronization, you don't have to worry.
Proper synchronization means your pool manager has acquire and release semantics in its methods, and that if a buffer is used by multiple threads between fetching it from and returning it to the pool, you are adding release semantics in the thread that stops using the buffer and adding acquire semantics in the thread that starts using the buffer. For instance, if you're passing the buffer through CompletableFuture
s, you shouldn't have to worry about this, or if you're communicating explicitly between threads with an Exchanger
or a proper implementation of BlockingQueue
.
From java.util.concurrent
's package description:
The methods of all classes in
java.util.concurrent
and its subpackages extend these guarantees to higher-level synchronization. In particular:
Actions in a thread prior to placing an object into any concurrent collection happen-before actions subsequent to the access or removal of that element from the collection in another thread.
Actions in a thread prior to the submission of a
Runnable
to anExecutor
happen-before its execution begins. Similarly forCallables
submitted to anExecutorService
.Actions taken by the asynchronous computation represented by a
Future
happen-before actions subsequent to the retrieval of the result viaFuture.get()
in another thread.Actions prior to "releasing" synchronizer methods such as
Lock.unlock
,Semaphore.release
, andCountDownLatch.countDown
happen-before actions subsequent to a successful "acquiring" method such asLock.lock
,Semaphore.acquire
,Condition.await
, andCountDownLatch.await
on the same synchronizer object in another thread.For each pair of threads that successfully exchange objects via an
Exchanger
, actions prior to theexchange()
in each thread happen-before those subsequent to the correspondingexchange()
in another thread.Actions prior to calling
CyclicBarrier.await
andPhaser.awaitAdvance
(as well as its variants) happen-before actions performed by the barrier action, and actions performed by the barrier action happen-before actions subsequent to a successful return from the correspondingawait
in other threads.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With