Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to determine shard id for a specific partition key with KCL?

PutRequest API uses partition keys in order to determine the shard id for the record. Even though the response of PutRequest contains shard id, it's not reliable because the shards are splittable so that the records may be moved to a new shard. I couldn't find a way to determine the shard id for a specific partition key in consumer side.

It seems that AWS maps partition keys to 128-bit integer keys but the hashing algorithm is not explained in the documentation. What I want to do is to process records in a Kinesis stream that has a specific partition key, which means that they will be in a specific shard so that I can just fetch data in a particular shard but I couldn't find the appropriate API in the documentation.

like image 392
burak emre Avatar asked Mar 15 '23 03:03

burak emre


1 Answers

According to the documentation, the hashing algorithm used is MD5.

An MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards using the hash key ranges of the shards.

See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html

In your situation, if you know the partition key for which you want to identify the appropriate shard you will need to do the following two things:

  1. Calculate the MD5 hash for the partition key
  2. Go through the list of shards to find the shard whose hash key range includes the hash value calculated in the first step.

Here's some code snippets to get you on your way:

MD5 Hash as BigInteger

String partitionKey = "YourKnownKey";
byte[] partitionBytes = partitionKey.getBytes("UTF-8");
byte[] hashBytes = MessageDigest.getInstance("MD5").digest(partitionBytes);
BigInteger biPartitionKey = new BigInteger(1, hashBytes);

Find Shard for Partition Key

Shard shardYouAreAfter = null;
String streamName = "YourStreamName";
StreamDescription streamDesc =  client.describeStream(streamName).getStreamDescription();
List<Shard> shards =  streamDesc.getShards();
for(Shard shard : shards){
        BigInteger startingHashKey = new BigInteger(shard.getHashKeyRange().getStartingHashKey());
        BigInteger endingHashKey = new BigInteger(shard.getHashKeyRange().getEndingHashKey());
        if(startingHashKey.compareTo(biPartKey) <= 0 &&
                endingHashKey.compareTo(biPartKey) >=0) {
            shardYouAreAfter = shard;
            break;
        }
}

Things can get a little more complicated if you have been splitting and/or merging shards. The above assumes you only have in existence active shards.

like image 81
ryanskinner Avatar answered Apr 06 '23 09:04

ryanskinner