I have created a Kafka Topic and pushed a message to it.
So
bin/kafka-console-consumer --bootstrap-server abc.xyz.com:9092 --topic myTopic --from-beginning --property print.key=true --property key.separator="-"
prints
key1-customer1
on the command line.
I want to create a Kafka Stream out of this topic and want to print this key1-customer1
on the console.
I wrote the following for it:
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "client-id");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "abc.xyz.com:9092");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Records should be flushed every 10 seconds. This is less than the default
// in order to keep this example interactive.
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
// For illustrative purposes we disable record caches
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> customerStream = builder.stream("myTopic");
customerStream.foreach(new ForeachAction<String, String>() {
public void apply(String key, String value) {
System.out.println(key + ": " + value);
}
});
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
This does not fail. However, this does not print anything on the console as this answer suggests.
I am new to Kafka. So any suggestions to make this work would help me a lot.
stream("myTopic"); customerStream. foreach(new ForeachAction<String, String>() { public void apply(String key, String value) { System. out. println(key + ": " + value); } }); final KafkaStreams streams = new KafkaStreams(builder.
You'll take an existing KStream object and use the toTable() method to covert it into a KTable . This new method (as of Apache Kafka 2.5) allows you to simply convert a record stream to a changelog stream. In this case you've materialized the KTable , so it's available for you to use Interactive Queries.
Lets say there are 8000 records in KStream, 14 records in KTable and Assuming that for each key in KStreams there is a record in KTable. So the expected output would be 8000 records.
KStream is an abstraction of a record stream of KeyValue pairs, i.e., each record is an independent entity/event in the real world. For example a user X might buy two items I1 and I2, and thus there might be two records <K:I1>, <K:I2> in the stream.
TL;DR Use Printed.
import org.apache.kafka.streams.kstream.Printed
val sysout = Printed
.toSysOut[String, String]
.withLabel("customerStream")
customerStream.print(sysout)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With