Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using the partition key in Kinesis to guarantee that records with the same key are processed by the same record processor (lambda)

I am working on a real-time data pipeline with AWS kinesis and lambda and I am trying to figure out how I can guarantee that records from the same data producers are processed by the same shard and ultimately by the same lambda function instance.

My approach is to use partition keys to make sure records from the same producers are processed by the same shard. However, I cannot get it working that records from the same shard are processed by the same lambda function instance.

The basic setup is the following:

  • There are multiple data sources that send data to a kinesis stream.
  • The stream has more than one shard to handle the load.
  • There is a lambda function connected to the scream with an event source mapping (batch size is 500).
  • The lambda function is processing the records, does some data transformations and a few other things and then puts everything into firehose.
  • There is more happening later but that is not relevant for the question.

It looks like that:

enter image description here

As you can see in the figure, there are three lambda function instances invoked for processing; one for each shard. In this pipeline, it is important that records from the same data source are processed by the same lambda function instance. According to what I read, this can be guaranteed by making sure all records from the same source use the same partition key so they are processed by the same shard.

Partition Keys

A partition key is used to group data by shard within a stream. The Kinesis Data Streams service segregates the data records belonging to a stream into multiple shards, using the partition key associated with each data record to determine which shard a given data record belongs to. Partition keys are Unicode strings with a maximum length limit of 256 bytes. An MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards. When an application puts data into a stream, it must specify a partition key.

Source: https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key

This works. So records with the same partition key are processed by the same shard. However, they are processed by different lambda function instances. So there is one lambda function instance invoked per shard but it doesn't only process records from one shard but from multiple shards. There seems to be no pattern here how records are handed over to lambda.

Here is my test setup: I sent a bunch of test data into the stream and printed the records in the lambda function. This is the output of the three function instances (check the partition keys at the end of each row. Each key should only appear in one of the three logs and not in multiple ones):

Lambda instance 1:

{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 103, 'id': 207, 'data': 'ce2', 'partitionKey': '103'}
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 103, 'id': 207, 'data': 'ce2', 'partitionKey': '103'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 205, 'data': 'ce5', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 205, 'data': 'ce5', 'partitionKey': '101'}

Lambda instance 2:

{'type': 'c', 'source': 101, 'id': 201, 'data': 'ce1', 'partitionKey': '101'}
{'type': 'c', 'source': 102, 'id': 206, 'data': 'ce1', 'partitionKey': '102'}
{'type': 'c', 'source': 101, 'id': 202, 'data': 'ce2', 'partitionKey': '101'}
{'type': 'c', 'source': 102, 'id': 206, 'data': 'ce1', 'partitionKey': '102'}
{'type': 'c', 'source': 101, 'id': 203, 'data': 'ce3', 'partitionKey': '101'}

Lambda instance 3:

{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 101, 'id': 201, 'data': 'ce1', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 202, 'data': 'ce2', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 203, 'data': 'ce3', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}

This is how I insert the data into the stream (as you can see, the partition key is set to the source id):

processed_records = []
for r in records:
    processed_records.append({
        'PartitionKey': str(r['source']),
        'Data': json.dumps(r),
    })

kinesis.put_records(
    StreamName=stream,
    Records=processed_records,
)

So my questions would be:

  • Why doesn't each lambda function only processes records of exactly one shard?
  • How can this be done?

Thanks!

like image 486
oneschilling Avatar asked Jul 11 '18 09:07

oneschilling


People also ask

What is partition key in Kinesis data stream?

A partition key is used to group data by shard within a stream. Kinesis Data Streams segregates the data records belonging to a stream into multiple shards. It uses the partition key that is associated with each data record to determine which shard a given data record belongs to.

Which property of a data record guarantees that all the records having the same value will be published into same shard of data stream?

Specifically, an MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards. As a result of this hashing mechanism, all data records with the same partition key map to the same shard within the stream.

Does Kinesis guarantee exactly once?

Though the records are guaranteed to be ordered within a shard and each record will end up in exactly one shard (thanks to the partition keys), Kinesis Streams does not provide the “exactly once” guarantee. This means that you will have duplicate records in a shard every now and then.


1 Answers

Why would you care which Lambda instance processes a shard? Lambda instances have no state anyway so it shouldn't matter which instance reads which shard. What's more important is that at any time a Lambda instance will only read from ONE shard. After its finished invocation it may read from another shard.

like image 143
Joseph Avatar answered Oct 22 '22 14:10

Joseph