Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

AWS Kinesis .NET Consumer

I am experimenting with producer and consumer using AWS Kinesis and the issue is the consumer keeps receiving the first message (or record) that we produced though we have changed the data object sent multiple times . Additionally we have tried multiple ShardIteratorType and none have worked. Latest produces no results, all others produce the same original record.

using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Amazon;
using Amazon.Internal;
using Amazon.Kinesis;
using Amazon.Kinesis.Model;
using BenchmarkRuleSetModel.Models;
using MongoDB.Driver;
using Newtonsoft.Json;

namespace ConsoleApp7
{
    internal class Program
    {
        private static AmazonKinesisClient _client;
        private static string _streamName;

        static async Task ReadFromStream()
        {
            var kinesisStreamName = _streamName;

            var describeRequest = new DescribeStreamRequest
            {
                StreamName = kinesisStreamName,
            };

            var describeResponse = await _client.DescribeStreamAsync(describeRequest);
            var shards = describeResponse.StreamDescription.Shards;

            foreach (var shard in shards)
            {
                var iteratorRequest = new GetShardIteratorRequest
                {
                    StreamName = kinesisStreamName,
                    ShardId = shard.ShardId,
                    ShardIteratorType = ShardIteratorType.AT_TIMESTAMP,
                    Timestamp = DateTime.MinValue
                };

                var iteratorResponse = await _client.GetShardIteratorAsync(iteratorRequest);
                var iteratorId = iteratorResponse.ShardIterator;

                while (!string.IsNullOrEmpty(iteratorId))
                {
                    var getRequest = new GetRecordsRequest
                    {
                        ShardIterator = iteratorId, Limit = 10000
                    };

                    var getResponse = await _client.GetRecordsAsync(getRequest);
                    var nextIterator = getResponse.NextShardIterator;
                    var records = getResponse.Records;

                    if (records.Count > 0)
                    {
                        Console.WriteLine("Received {0} records. ", records.Count);
                        foreach (var record in records)
                        {
                            var json = Encoding.UTF8.GetString(record.Data.ToArray());
                            Console.WriteLine("Json string: " + json);
                        }
                    }

                    iteratorId = nextIterator;
                }
            }
        }

        private static async Task<string> Produce()
        {
            var data = new
            {
                Message = "Hello world!",
                Author = "Amir"
            };

            //convert to byte array in prep for adding to stream
            var oByte = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data));

            using (var ms = new MemoryStream(oByte))
            {
                //create put request
                var requestRecord = new PutRecordRequest
                {
                    StreamName = _streamName,
                    PartitionKey = Guid.NewGuid().ToString(),
                    Data = ms
                };
                //list name of Kinesis stream
                //give partition key that is used to place record in particular shard
                //add record as memorystream

                //PUT the record to Kinesis
                var response = await _client.PutRecordAsync(requestRecord);

                return response.SequenceNumber;
            }
        }

        static void Main(string[] args)
        {
            _client = new AmazonKinesisClient("ExampleKey", "ExampleSecret", RegionEndpoint.EUWest2);

            _streamName = "SomeStream";

            Produce().Wait();

            ReadFromStream().Wait();
        }
    }
}
like image 384
Node.JS Avatar asked May 03 '19 20:05

Node.JS


People also ask

Does Kinesis support multiple consumers?

Kinesis allows multiple consumers to read from the same shard concurrently.

Why Kafka is better than Kinesis?

Kinesis Comparison. Kafka is more highly configurable compared to Kinesis. With Kafka, it's possible to write data to a single server. On the other hand, Kinesis is designed to write simultaneously to three servers – a constraint that makes Kafka a better performing solution.

How many consumers can a Kinesis stream have?

You can register up to 20 consumers per stream. A given consumer can only be registered with one stream at a time. For an example of how to use this operations, see Enhanced Fan-Out Using the Kinesis Data Streams API. The use of this operation has a limit of five transactions per second per account.

What is Kinesis consumer Library?

The Amazon Kinesis Client Library for Java (Amazon KCL) enables Java developers to easily consume and process data from Amazon Kinesis.


1 Answers

First of all, as I have debugged your code, I noticed that it loops infinitely in the inner loop (while (!string.IsNullOrEmpty(iteratorId))) and never loops over all the shards in your stream (assuming you have >1). The reason is explained in https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#getrecords-returns-empty - because the producer never called MergeShards or SplitShards, they remain open, thus NextShardIterator will never be NULL.

This is why you only ever see records put on the first shard (or at least I did when running your code) - you must read from shards in parallel.

As far as your usage pattern goes, you're using:

ShardIteratorType = ShardIteratorType.AT_TIMESTAMP,
Timestamp = DateTime.MinValue

By this, you're essentially telling Kinesis "give me all the records in the stream from the beginning of time" (or at least as far as the retention period reaches). That's why you keep seeing the same old records in addition to new ones (again, that's what I saw when I ran your code).

A GetRecords[Async] call does not actually remove records from the stream (see https://stackoverflow.com/a/25741304/4940707). The correct way of using Kinesis is to move checkpoint-to-checkpoint. If the consumer was to persist the SequenceNumber from the last record read and then restart as such:

ShardIteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER,
StartingSequenceNumber = lastSeenSequenceNumber

Then you'd see only newer records.

like image 107
Misza Avatar answered Oct 25 '22 03:10

Misza