I am working on a real-time data pipeline with AWS kinesis and lambda and I am trying to figure out how I can guarantee that records from the same data producers are processed by the same shard and ultimately by the same lambda function instance.
My approach is to use partition keys to make sure records from the same producers are processed by the same shard. However, I cannot get it working that records from the same shard are processed by the same lambda function instance.
The basic setup is the following:
It looks like that:
As you can see in the figure, there are three lambda function instances invoked for processing; one for each shard. In this pipeline, it is important that records from the same data source are processed by the same lambda function instance. According to what I read, this can be guaranteed by making sure all records from the same source use the same partition key so they are processed by the same shard.
Partition Keys
A partition key is used to group data by shard within a stream. The Kinesis Data Streams service segregates the data records belonging to a stream into multiple shards, using the partition key associated with each data record to determine which shard a given data record belongs to. Partition keys are Unicode strings with a maximum length limit of 256 bytes. An MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards. When an application puts data into a stream, it must specify a partition key.
Source: https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key
This works. So records with the same partition key are processed by the same shard. However, they are processed by different lambda function instances. So there is one lambda function instance invoked per shard but it doesn't only process records from one shard but from multiple shards. There seems to be no pattern here how records are handed over to lambda.
Here is my test setup: I sent a bunch of test data into the stream and printed the records in the lambda function. This is the output of the three function instances (check the partition keys at the end of each row. Each key should only appear in one of the three logs and not in multiple ones):
Lambda instance 1:
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 103, 'id': 207, 'data': 'ce2', 'partitionKey': '103'}
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 103, 'id': 207, 'data': 'ce2', 'partitionKey': '103'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 205, 'data': 'ce5', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 205, 'data': 'ce5', 'partitionKey': '101'}
Lambda instance 2:
{'type': 'c', 'source': 101, 'id': 201, 'data': 'ce1', 'partitionKey': '101'}
{'type': 'c', 'source': 102, 'id': 206, 'data': 'ce1', 'partitionKey': '102'}
{'type': 'c', 'source': 101, 'id': 202, 'data': 'ce2', 'partitionKey': '101'}
{'type': 'c', 'source': 102, 'id': 206, 'data': 'ce1', 'partitionKey': '102'}
{'type': 'c', 'source': 101, 'id': 203, 'data': 'ce3', 'partitionKey': '101'}
Lambda instance 3:
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 100, 'id': 200, 'data': 'ce', 'partitionKey': '100'}
{'type': 'c', 'source': 101, 'id': 201, 'data': 'ce1', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 202, 'data': 'ce2', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 203, 'data': 'ce3', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
{'type': 'c', 'source': 101, 'id': 204, 'data': 'ce4', 'partitionKey': '101'}
This is how I insert the data into the stream (as you can see, the partition key is set to the source id):
processed_records = []
for r in records:
processed_records.append({
'PartitionKey': str(r['source']),
'Data': json.dumps(r),
})
kinesis.put_records(
StreamName=stream,
Records=processed_records,
)
So my questions would be:
Thanks!
A partition key is used to group data by shard within a stream. Kinesis Data Streams segregates the data records belonging to a stream into multiple shards. It uses the partition key that is associated with each data record to determine which shard a given data record belongs to.
Specifically, an MD5 hash function is used to map partition keys to 128-bit integer values and to map associated data records to shards. As a result of this hashing mechanism, all data records with the same partition key map to the same shard within the stream.
Though the records are guaranteed to be ordered within a shard and each record will end up in exactly one shard (thanks to the partition keys), Kinesis Streams does not provide the “exactly once” guarantee. This means that you will have duplicate records in a shard every now and then.
Why would you care which Lambda instance processes a shard? Lambda instances have no state anyway so it shouldn't matter which instance reads which shard. What's more important is that at any time a Lambda instance will only read from ONE shard. After its finished invocation it may read from another shard.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With