I am trying to use the alpakka kinesis connector to send messages to a Kinesis Stream but I have no success with it. I tried the code below but nothing in my stream.
implicit val sys = ActorSystem()
implicit val mat = ActorMaterializer()
implicit val kinesisAsync: AmazonKinesisAsync = AmazonKinesisAsyncClientBuilder.defaultClient()
val debug = Flow[PutRecordsRequestEntry].map { reqEntry =>
println(reqEntry)
reqEntry
}
val entry = new PutRecordsRequestEntry()
.withData(ByteBuffer.wrap("Hello World".getBytes))
.withPartitionKey(Random.nextInt.toString)
Source.tick(1.second, 1.second, entry).to(KinesisSink("myStreamName", KinesisFlowSettings.defaultInstance)).run()
// 2) Source.tick(1.second, 1.second,entry).via(debug).to(KinesisSink("myStreamName", inesisFlowSettings.defaultInstance)).run()
Sink.foreach(println)
instead of KinesisSink
prints out the PutRecordsRequestEntry
every 1 second => EXPECTEDKinesisSink
, the entry is generated only once.What Am I doing wrong ?
I am checking my stream with a KinesisSource
and reading is working ( tested with another stream)
Also the monitoring dashboard of AWS Kinesis doesnt show any PUT requests.
Note 1: I tried to enable the debug log of alpakka but with no effect
<logger name="akka.stream.alpakka.kinesis" level="DEBUG"/>
in my logback.xml
+ debug on root level
Some troubleshooting steps to consider below - I hope they help.
I suspect you're likely missing credentials and/or region configuration for your Kinesis client.
Kinesis Firehose
The Kinesis Producer Library (what Alpakka seems to be using) does not work with Kinesis Firehose. If you're trying to write to Firehose this isn't going to work.
Application Logging
You'll probably want to enable logging for the Kinesis Producer Library, not just in Alpakka itself. Relevant documentation is available here:
Configuring the Kinesis Producer Library
Configuration Defaults for Kinesis Producer Library
AWS Side Logging
AWS CloudTrail is automatically enabled out of the box for Kinesis streams, and by default AWS will keep 90 days of CloudTrail logs for you.
https://docs.aws.amazon.com/streams/latest/dev/logging-using-cloudtrail.html
You can use the CloudTrail logs to see the API calls your application is making to Kinesis on your behalf. There's usually a modest delay in requests showing up - but this will let you know if the request is failing due to insufficient IAM permissions or some other issue with your AWS resource configuration.
Check SDK Authentication
The Kinesis client will be using the DefaultAWSCredentialsProviderChain
credentials provider to make requests to AWS.
You'll need to make sure you are providing valid AWS credentials with IAM rights to make those requests to Kinesis. If your code is running on AWS, the preferred way of giving your application credentials is using IAM Roles (specified at instance launch time).
You'll also need to specify the AWS Region when building the client in your code. Use your application.properties
for configuring this, or if your application is part of a CloudFormation stack that lives in a single region - using the instance metadata service to retrieve the current region when your code is running on AWS.
The problem was an access denied / permission on the action on the stream.
I had to add the akka actor config for logging
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
logger-startup-timeout = "30s"
}
to see debug lines and I actually run in debug and step in each stage.
It required permission "PutRecords" in the IAM role
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With