Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Streams: how to write to a topic?

In Kafka Streams, whats the canonical way of producing/writing a stream? In Spark, there is the custom receiver which works as a long running adapter from an arbitrary data source. What is the equivalent in Kafka Streams?

To be specific, I'm not asking how to do transforms from one topic to another. The documentation is very clear on that. I want to understand how to write my workers that will be doing the first write in a series of transforms into Kafka.

I expect to be able to do

builder1.<something>(<some intake worker like a spark reciver)
       .to(topic1)
       .start()

builder2.from(topic1)
        .transform(<some transformation function>)
        .to(topic2)
        .start()

But none of the existing documentation shows this? Am I missing something?

like image 226
dmead Avatar asked Jun 29 '16 17:06

dmead


2 Answers

Depends on whether you are using the Kafka Streams DSL or Processor API:

  • Kafka Streams DSL You can use KStream#to() to materialize the KStream to a topic. This is the canonical way to materialize data to a topic. Alternatively, you can use KStream#through(). This will also materialize data to a topic, but also returns the resulting KStream for further use. The only difference between #to() and #through(), then, is that it saves you a KStreamBuilder#stream() if you want the resulting materialized partition as a KStream.

  • Processor API You materialize data to a partition by forwarding the data to a sink processor.

Either way, a crucial thing to note is that data is not materialized to a topic until you write to a partition using one of the methods mentioned above. map(), filter(), etc do not materialize data. The data remains in the processor task/thread/memory until it is materialized by one of the methods above.


To produce into Kafka Streams:

Properties producerConfig = new Properties();
producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:2181");
producerConfig.put(ACKS_CONFIG, "all");
producerConfig.put(RETRIES_CONFIG, 0);
Producer<Integer, Integer> producer = new KafkaProducer<>(producerConfig, new IntegerSerializer(), new IntegerSerializer<>());

and then:

Arrays.asList(1, 2, 3, 4).forEach(integer -> producer.send(new ProducerRecord<>("integers", integer, integer)))

You will need:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${version.kafka}</version>
</dependency>
like image 64
Dmitry Minkovsky Avatar answered Nov 15 '22 10:11

Dmitry Minkovsky


I want to understand how to write my workers that will be doing the first write in a series of transforms into kafka.

The initial write (= input data) should not be done via Kafka Streams. Kafka Streams assumes that the input data is already in Kafka.

So this expected workflow of yours is not applicable:

builder1.<something>(<some intake worker like a spark reciver)
   .to(topic1)
   .start()

Rather, you'd use something like Kafka Connect to get data into Kafka (e.g. from a database into a Kafka topic) or use the "normal" Kafka producer clients (Java, C/C++, Python, ...) to write the input data into Kafka.

There's no "hook" yet available in Kafka Streams to bootstrap the input data. We're looking at a better integration of Kafka Connect and Kafka Streams, so this situation may well improve in the near future.

like image 33
Michael G. Noll Avatar answered Nov 15 '22 11:11

Michael G. Noll