Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Alpakka KinesisSink : Can not push messages to Stream

I am trying to use the alpakka kinesis connector to send messages to a Kinesis Stream but I have no success with it. I tried the code below but nothing in my stream.

implicit val sys = ActorSystem()
implicit val mat = ActorMaterializer()
implicit val kinesisAsync: AmazonKinesisAsync = AmazonKinesisAsyncClientBuilder.defaultClient()


val debug = Flow[PutRecordsRequestEntry].map { reqEntry =>
    println(reqEntry)
    reqEntry
}

val entry = new PutRecordsRequestEntry()
    .withData(ByteBuffer.wrap("Hello World".getBytes))
    .withPartitionKey(Random.nextInt.toString)

Source.tick(1.second, 1.second, entry).to(KinesisSink("myStreamName", KinesisFlowSettings.defaultInstance)).run()

// 2) Source.tick(1.second, 1.second,entry).via(debug).to(KinesisSink("myStreamName", inesisFlowSettings.defaultInstance)).run()
  • Using a Sink.foreach(println) instead of KinesisSink prints out the PutRecordsRequestEntry every 1 second => EXPECTED
  • Using KinesisSink, the entry is generated only once.

What Am I doing wrong ?

I am checking my stream with a KinesisSource and reading is working ( tested with another stream)

Also the monitoring dashboard of AWS Kinesis doesnt show any PUT requests.

Note 1: I tried to enable the debug log of alpakka but with no effect

<logger name="akka.stream.alpakka.kinesis" level="DEBUG"/>

in my logback.xml + debug on root level

like image 663
ccheneson Avatar asked Jan 03 '18 16:01

ccheneson


2 Answers

Some troubleshooting steps to consider below - I hope they help.

I suspect you're likely missing credentials and/or region configuration for your Kinesis client.

Kinesis Firehose

The Kinesis Producer Library (what Alpakka seems to be using) does not work with Kinesis Firehose. If you're trying to write to Firehose this isn't going to work.

Application Logging

You'll probably want to enable logging for the Kinesis Producer Library, not just in Alpakka itself. Relevant documentation is available here:

Configuring the Kinesis Producer Library

Configuration Defaults for Kinesis Producer Library

AWS Side Logging

AWS CloudTrail is automatically enabled out of the box for Kinesis streams, and by default AWS will keep 90 days of CloudTrail logs for you.

https://docs.aws.amazon.com/streams/latest/dev/logging-using-cloudtrail.html

You can use the CloudTrail logs to see the API calls your application is making to Kinesis on your behalf. There's usually a modest delay in requests showing up - but this will let you know if the request is failing due to insufficient IAM permissions or some other issue with your AWS resource configuration.

Check SDK Authentication

The Kinesis client will be using the DefaultAWSCredentialsProviderChain credentials provider to make requests to AWS.

You'll need to make sure you are providing valid AWS credentials with IAM rights to make those requests to Kinesis. If your code is running on AWS, the preferred way of giving your application credentials is using IAM Roles (specified at instance launch time).

You'll also need to specify the AWS Region when building the client in your code. Use your application.properties for configuring this, or if your application is part of a CloudFormation stack that lives in a single region - using the instance metadata service to retrieve the current region when your code is running on AWS.

like image 73
bly Avatar answered Oct 11 '22 13:10

bly


The problem was an access denied / permission on the action on the stream.

I had to add the akka actor config for logging

akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "DEBUG"
  stdout-loglevel = "DEBUG"
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
  logger-startup-timeout = "30s"
}

to see debug lines and I actually run in debug and step in each stage.

It required permission "PutRecords" in the IAM role

like image 2
ccheneson Avatar answered Oct 11 '22 12:10

ccheneson