Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiple different consumers of same Kinesis stream

I have a Kinesis producer which writes a single type of message to a stream. I want to process this stream in multiple, completely different consumer applications. So, a pub/sub with a single publisher for a given topic/stream. I also want to make use of checkpointing to ensure that each consumer processes every message written to the stream.

Initially, I was using the same App Name for all consumers and producers. However, I started getting the following error once I started more than one consumer:

com.amazonaws.services.kinesis.model.InvalidArgumentException: StartingSequenceNumber 49564236296344566565977952725717230439257668853369405442 used in GetShardIterator on shard shardId-000000000000 in stream PackageCreated under account ************ is invalid because it did not come from this stream. (Service: AmazonKinesis; Status Code: 400; Error Code: InvalidArgumentException; Request ID: ..)

This seems to be because consumers are clashing with their checkpointing as they are using the same App Name.

From reading the documentation, it seems the only way to do pub/sub with checkpointing is by having a stream per consumer application, which requires each producer to know about all possible consumers. This is more tightly coupled than I want; it's really just a queue.

It seems like Kafka supports what I want: arbitrary consumption of a given topic/partition, since consumers are completely in control of their own checkpointing. Is my only option to move to Kafka, or some other alternative, if I want pub/sub with checkpointing?

My RecordProcessor code, which is identical in each consumer:

override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
  log.trace("Received record(s) from kinesis")
  for {
    record <- processRecordsInput.getRecords
    json   <- jawn.parseByteBuffer(record.getData).toOption
    msg    <- decode[T](json.toString).toOption
  } yield subscriber ! msg
  processRecordsInput.getCheckpointer.checkpoint()
}

The code parses the message and sends it off to the subscriber. For now, I'm simply marking all messages as successfully received. I can see messages being sent on the AWS Kinesis dashboard, but no reads happen, presumably because each application has its own AppName and doesn't see any other messages.

like image 489
CalumMcCall Avatar asked Jan 06 '23 11:01

CalumMcCall


1 Answers

The pattern you want, that of one publisher to & multiple consumers from one Kinesis stream, is supported. You don't need a separate stream per consumer.

How do you do that? You need to give a different application-name to every consumer. That way, checkpointing info of one consumer won't collide with that of another.

Check the first response to this: https://forums.aws.amazon.com/message.jspa?messageID=554375

like image 172
ketan vijayvargiya Avatar answered Jan 13 '23 20:01

ketan vijayvargiya