Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to print KStream to console?

I have created a Kafka Topic and pushed a message to it.

So

bin/kafka-console-consumer --bootstrap-server abc.xyz.com:9092 --topic myTopic --from-beginning --property print.key=true --property key.separator="-"

prints

key1-customer1

on the command line.

I want to create a Kafka Stream out of this topic and want to print this key1-customer1 on the console.

I wrote the following for it:

final Properties streamsConfiguration = new Properties();

streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-id");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "client-id");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "abc.xyz.com:9092");

streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

// Records should be flushed every 10 seconds. This is less than the default
// in order to keep this example interactive.
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
// For illustrative purposes we disable record caches
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> customerStream = builder.stream("myTopic");
customerStream.foreach(new ForeachAction<String, String>() {
    public void apply(String key, String value) {
        System.out.println(key + ": " + value);
    }
});

final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);

streams.start();   

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

This does not fail. However, this does not print anything on the console as this answer suggests.

I am new to Kafka. So any suggestions to make this work would help me a lot.

like image 565
Vicky Avatar asked Dec 21 '17 07:12

Vicky


People also ask

How do I print from KStream?

stream("myTopic"); customerStream. foreach(new ForeachAction<String, String>() { public void apply(String key, String value) { System. out. println(key + ": " + value); } }); final KafkaStreams streams = new KafkaStreams(builder.

How do you convert KStream to KTable?

You'll take an existing KStream object and use the toTable() method to covert it into a KTable . This new method (as of Apache Kafka 2.5) allows you to simply convert a record stream to a changelog stream. In this case you've materialized the KTable , so it's available for you to use Interactive Queries.

What is the output of KStream KTable join?

Lets say there are 8000 records in KStream, 14 records in KTable and Assuming that for each key in KStreams there is a record in KTable. So the expected output would be 8000 records.

What is KStream in Kafka?

KStream is an abstraction of a record stream of KeyValue pairs, i.e., each record is an independent entity/event in the real world. For example a user X might buy two items I1 and I2, and thus there might be two records <K:I1>, <K:I2> in the stream.


1 Answers

TL;DR Use Printed.

import org.apache.kafka.streams.kstream.Printed
val sysout = Printed
  .toSysOut[String, String]
  .withLabel("customerStream")
customerStream.print(sysout)
like image 98
Jacek Laskowski Avatar answered Oct 19 '22 22:10

Jacek Laskowski