So far I have followed the instructions documented for Flink's kinesis connector to use a local Kinesis.
Properties producerConfig = new Properties();
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
With a Flink producer, these instructions work with a local kinesis (I use Kinesalite).
However, with a Flink consumer, I get an exception that aws.region and aws.endpoint are not both allowed. But region is required, which means its not possible to override the endpoint.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: For FlinkKinesisConsumer either AWS region ('aws.region') or AWS endpoint ('aws.endpoint') must be set in the config.
Is this a bug in the connector? I see a related PR: https://github.com/apache/flink/pull/6045 .
I found a workaround on Flink's mailing list, but they describe this as an issue for the producer and not the consumer, whereas i see the opposite (i think), so not sure about this. It's really confusing.
There has been some progress made since this question was asked.
The asker pushed the issue in this jira which was marked as a duplicate of this second jira .
The issue should now be resolved, and the fix available for version 1.10 and above.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With