I am experimenting with producer and consumer using AWS Kinesis and the issue is the consumer keeps receiving the first message (or record) that we produced though we have changed the data object sent multiple times . Additionally we have tried multiple ShardIteratorType and none have worked. Latest produces no results, all others produce the same original record.
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Amazon;
using Amazon.Internal;
using Amazon.Kinesis;
using Amazon.Kinesis.Model;
using BenchmarkRuleSetModel.Models;
using MongoDB.Driver;
using Newtonsoft.Json;
namespace ConsoleApp7
{
internal class Program
{
private static AmazonKinesisClient _client;
private static string _streamName;
static async Task ReadFromStream()
{
var kinesisStreamName = _streamName;
var describeRequest = new DescribeStreamRequest
{
StreamName = kinesisStreamName,
};
var describeResponse = await _client.DescribeStreamAsync(describeRequest);
var shards = describeResponse.StreamDescription.Shards;
foreach (var shard in shards)
{
var iteratorRequest = new GetShardIteratorRequest
{
StreamName = kinesisStreamName,
ShardId = shard.ShardId,
ShardIteratorType = ShardIteratorType.AT_TIMESTAMP,
Timestamp = DateTime.MinValue
};
var iteratorResponse = await _client.GetShardIteratorAsync(iteratorRequest);
var iteratorId = iteratorResponse.ShardIterator;
while (!string.IsNullOrEmpty(iteratorId))
{
var getRequest = new GetRecordsRequest
{
ShardIterator = iteratorId, Limit = 10000
};
var getResponse = await _client.GetRecordsAsync(getRequest);
var nextIterator = getResponse.NextShardIterator;
var records = getResponse.Records;
if (records.Count > 0)
{
Console.WriteLine("Received {0} records. ", records.Count);
foreach (var record in records)
{
var json = Encoding.UTF8.GetString(record.Data.ToArray());
Console.WriteLine("Json string: " + json);
}
}
iteratorId = nextIterator;
}
}
}
private static async Task<string> Produce()
{
var data = new
{
Message = "Hello world!",
Author = "Amir"
};
//convert to byte array in prep for adding to stream
var oByte = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data));
using (var ms = new MemoryStream(oByte))
{
//create put request
var requestRecord = new PutRecordRequest
{
StreamName = _streamName,
PartitionKey = Guid.NewGuid().ToString(),
Data = ms
};
//list name of Kinesis stream
//give partition key that is used to place record in particular shard
//add record as memorystream
//PUT the record to Kinesis
var response = await _client.PutRecordAsync(requestRecord);
return response.SequenceNumber;
}
}
static void Main(string[] args)
{
_client = new AmazonKinesisClient("ExampleKey", "ExampleSecret", RegionEndpoint.EUWest2);
_streamName = "SomeStream";
Produce().Wait();
ReadFromStream().Wait();
}
}
}
Kinesis allows multiple consumers to read from the same shard concurrently.
Kinesis Comparison. Kafka is more highly configurable compared to Kinesis. With Kafka, it's possible to write data to a single server. On the other hand, Kinesis is designed to write simultaneously to three servers – a constraint that makes Kafka a better performing solution.
You can register up to 20 consumers per stream. A given consumer can only be registered with one stream at a time. For an example of how to use this operations, see Enhanced Fan-Out Using the Kinesis Data Streams API. The use of this operation has a limit of five transactions per second per account.
The Amazon Kinesis Client Library for Java (Amazon KCL) enables Java developers to easily consume and process data from Amazon Kinesis.
First of all, as I have debugged your code, I noticed that it loops infinitely in the inner loop (while (!string.IsNullOrEmpty(iteratorId))
) and never loops over all the shards in your stream (assuming you have >1). The reason is explained in https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#getrecords-returns-empty - because the producer never called MergeShards
or SplitShards
, they remain open, thus NextShardIterator
will never be NULL
.
This is why you only ever see records put on the first shard (or at least I did when running your code) - you must read from shards in parallel.
As far as your usage pattern goes, you're using:
ShardIteratorType = ShardIteratorType.AT_TIMESTAMP,
Timestamp = DateTime.MinValue
By this, you're essentially telling Kinesis "give me all the records in the stream from the beginning of time" (or at least as far as the retention period reaches). That's why you keep seeing the same old records in addition to new ones (again, that's what I saw when I ran your code).
A GetRecords[Async]
call does not actually remove records from the stream (see https://stackoverflow.com/a/25741304/4940707). The correct way of using Kinesis is to move checkpoint-to-checkpoint. If the consumer was to persist the SequenceNumber
from the last record read and then restart as such:
ShardIteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER,
StartingSequenceNumber = lastSeenSequenceNumber
Then you'd see only newer records.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With