Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Streaming messages from one Kafka Cluster to another

I'm currently trying to, easily, stream messages from a Topic on one Kafka cluster to another one (Remote -> Local Cluster).
The idea is to use Kafka-Streams right away so that we don't need to replicate the actual messages on the local cluster but only get the "results" of the Kafka-Streams processing into our Kafka-Topics.

So let's say the WordCount demo is on one Kafka-Instance on another PC than my own. I also have a Kafka-Instance running on my local machine.
Now I want to let the WordCount demo run on the Topic ("remote") containing the sentences which words should be counted.
The counting however should be written to a Topic on my local system instead of a "remote" Topic.

Is something like this doable with the Kafka-Streams API?
E.g.

val builder: KStreamBuilder = new KStreamBuilder(remote-streamConfig, local-streamconfig)
val textLines: KStream[String, String] = builder.stream("remote-input-topic", 
remote-streamConfig)
val wordCounts: KTable[String, Long] = textLines
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
    .groupBy((_, word) => word)
    .count("word-counts")

wordCounts.to(stringSerde, longSerde, "local-output-topic", local-streamconfig)

val streams: KafkaStreams = new KafkaStreams(builder)
streams.start()

Thank you very much
- Tim

like image 691
Tim.G. Avatar asked Dec 15 '17 01:12

Tim.G.


People also ask

How do I copy data from one Kafka cluster to another?

If you want to replicate data from one cluster to another then there is one kafka tool called MirrorMaker . Kafka comes with a tool for mirroring data between Kafka clusters. The tool reads from a source cluster and writes to a destination cluster.

Can a Kafka consumer read from multiple clusters?

A consumer group, such as a Kafka Streams-based application, can process data from a single Kafka cluster only. Therefore, multi-topic subscriptions or load balancing across the consumers in a consumer group are possible only within a single Kafka cluster.

What component of Kafka helps with replication from cluster to another?

Topic Replication Factor The replication factor that is set defines how many copies of a topic are maintained across the Kafka cluster. It is defined at the topic level, and takes place at the partition level.

Is Kafka Streams multithreaded?

Threading Model Kafka Streams allows the user to configure the number of threads that the library can use to parallelize processing within an application instance. Each thread can execute one or more stream tasks with their processor topologies independently.


1 Answers

Kafka Streams is build for single cluster only.

A workaround is to use a foreach() or similar and instantiate your own KafkaProducer that write to the target cluster. Note, that your own producer must use sync writes! Otherwise, you might loose data in case of failure. Thus, it's not a very performant solution.

It's better to just write the result to the source cluster and replicate the data to the target cluster. Note, that you most likely can use a much shorter retention period of the output topic in the source cluster, as the actual data is stored with longer retention time in the target cluster anyway. This allows, you limit required storage on the source cluster.

Edit (reply to comment below from @quickinsights)

what if your Kafka streams service is down for longer period than the retention

That seems to be an orthogonal concern, that can be raised for any design. Retention time should be set depending on you maximum downtime to avoid data loss in general. Note thought, that because the application reads/write from/to the source cluster, and the source cluster output topic may be configures with a small retention time, nothing bad happens if the application goes down. The input topic will not be processed and no new output data is produced. You might only worry about the case, for which your replication pipeline into the target cluster goes down -- you should set the retention time of the output topic in the source cluster accordingly to make sure you don't loose any data.

It also doubles your writes back to Kafka.

Yes. It also increases storage footprint on disk. It's tradeoff (as always) between application resilience and runtime performance vs. cluster load. Your choice. I would personally recommend to go with the more resilient option as pointed out above. It's easier to scale out your Kafka cluster than handling all the resilience edge cases in your application code.

That seems super inefficient

That's a personal judgment call. It's a tradeoff and there is no objective right or wrong.

like image 144
Matthias J. Sax Avatar answered Nov 15 '22 09:11

Matthias J. Sax