I cant seem to find a decent example that shows how can I consume an AWS Kinesis stream via Python. Can someone please provide me with some examples I could look into?
Best
To get started with Kinesis Data Analytics, you create a Kinesis data analytics application that continuously reads and processes streaming data. The service supports ingesting data from Amazon Kinesis Data Streams and Amazon Kinesis Data Firehose streaming sources.
Kafka-Kinesis-Connector for Kinesis is used to publish messages from Kafka to Amazon Kinesis Streams. Kafka-Kinesis-Connector can be executed on on-premise nodes or EC2 machines. It can be executed in standalone mode as well as distributed mode.
To read from a stream continually, call GetRecords in a loop. Use GetShardIterator to get the shard iterator to specify in the first GetRecords call. GetRecords returns a new shard iterator in NextShardIterator . Specify the shard iterator returned in NextShardIterator in subsequent calls to GetRecords.
Kinesis pushes records to Lambda over HTTP/2. For details about Kinesis data streams, see Reading Data from Amazon Kinesis Data Streams. Lambda reads records from the data stream and invokes your function synchronously with an event that contains stream records.
you should use boto.kinesis:
from boto import kinesis
After you created a stream:
step 1: connect to aws kinesis:
auth = {"aws_access_key_id":"id", "aws_secret_access_key":"key"}
connection = kinesis.connect_to_region('us-east-1',**auth)
step 2: get the stream info (like how many shards, if it is active ..)
tries = 0
while tries < 10:
tries += 1
time.sleep(1)
try:
response = connection.describe_stream('stream_name')
if response['StreamDescription']['StreamStatus'] == 'ACTIVE':
break
except :
logger.error('error while trying to describe kinesis stream : %s')
else:
raise TimeoutError('Stream is still not active, aborting...')
step 3 : get all shard ids, and for each shared id get the shard iterator:
shard_ids = []
stream_name = None
if response and 'StreamDescription' in response:
stream_name = response['StreamDescription']['StreamName']
for shard_id in response['StreamDescription']['Shards']:
shard_id = shard_id['ShardId']
shard_iterator = connection.get_shard_iterator(stream_name, shard_id, shard_iterator_type)
shard_ids.append({'shard_id' : shard_id ,'shard_iterator' : shard_iterator['ShardIterator'] })
step 4 : read the data for each shard
limit is the limit of records that you want to receive. (you can receive up to 10 MB) shard_iterator is the shared from previous step.
tries = 0
result = []
while tries < 100:
tries += 1
response = connection.get_records(shard_iterator = shard_iterator , limit = limit)
shard_iterator = response['NextShardIterator']
if len(response['Records'])> 0:
for res in response['Records']:
result.append(res['Data'])
return result , shard_iterator
in your next call to get_records, you should use the shard_iterator that you received with the result of the previous get_records.
note: in one call to get_records, (limit = None) you can receive empty records. if calling to get_records with a limit, you will get the records that are in the same partition key (when you put data in to stream, you have to use partition key :
connection.put_record(stream_name, data, partition_key)
While this question has already been answered, it might be a good idea for future readers to consider using the Kinesis Client Library (KCL) for Python
instead of using boto
directly. It simplifies consuming from the stream when you have multiple consumer instances, and/or changing shard configurations.
https://aws.amazon.com/blogs/aws/speak-to-kinesis-in-python/
A more complete enumeration of what the KCL provides
The items in bold are the ones that I think are where the KCL really provides non-trivial value over boto. But depending on your usecase boto may be much much much simpler.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With