This issue is now solved on Message Hub
I am having some trouble creating a KTable in Kafka. I am new to Kafka, which is probably the root of my problem, but I thought I could ask here anyway. I have a project where I would like to keep track of different IDs by counting their total occurrence. I am using Message Hub on IBM Cloud to manage my topics, and it has worked splendid so far.
I have a topic on Message Hub that produces messages like {"ID":"123","TIMESTAMP":"1525339553", "BALANCE":"100", "AMOUNT":"4"}
, for now, the only key of relevance is ID.
My Kafka code, along with the Streams configuration, looks like this:
import org.apache.kafka.streams.StreamsConfig;
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "3");
props.put("security.protocol","SASL_SSL");
props.put("sasl.mechanism","PLAIN");
props.put("ssl.protocol","TLSv1.2");
props.put("ssl.enabled.protocols","TLSv1.2");
String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";
saslJaasConfig = saslJaasConfig.replace("USERNAME", user).replace("PASSWORD", password);
props.put("sasl.jaas.config",saslJaasConfig);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> Kstreams = builder.stream(myTopic);
KTable<String, Long> eventCount = Kstreams
.flatMapValues(value -> getID(value)) //function that retrieves the ID
.groupBy((key, value) -> value)
.count();
When I run the code, I get the following error(s):
Exception in thread "KTableTest-e2062d11-0b30-4ed0-82b0-00d83dcd9366->StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Could not create topic KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition.
Followed by:
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.PolicyViolationException: Invalid configuration: {segment.index.bytes=52428800, segment.bytes=52428800, cleanup.policy=delete, segment.ms=600000}. Only allowed configs: [retention.ms, cleanup.policy]
I have no idea why this error occurs, and what could be done about it. Is the way I have built the KStream and KTable incorrect somehow? Or perhaps the message hub on bluemix?
Solved:
Adding an extract from the comments below the answer I have marked as correct. Turned out my StreamsConfig was fine, and that there (for now) is an issue on Message Hub's side, but there is a workaround:
It turns out Message Hub has an issue when creating repartition topics with Kafka Streams 1.1. While we work on a fix, you'll need to create the topic KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition by hand. It needs as many partitions as your input topic (myTopic) and set the retention time to the max. I'll post another comment once it's fixed
Many thanks for the help!
Message Hub has some restrictions on the configurations that can be used when creating topics.
From the PolicyViolationException you received, it looks like your Streams application tried to use a few configs we don't allow:
I'm guessing you set those somewhere in your Streams configuration and they should be removed.
Note that you also need to set StreamsConfig.REPLICATION_FACTOR_CONFIG
to 3 in your config to work with Message Hub as mentioned in our docs.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With