Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Aggregate over multiple partitions in Kafka Streams

This is, in part, a follow-up to Aggregation over a specific partition in Apache Kafka Streams

Let's suppose I have a topic named "events" with 3 partitions on which I send string -> integer data like so:

(Bob, 3) on partition 1

(Sally, 4) on partition 2

(Bob, 2) on partition 3

...

I would like to aggregate the values (in this example, just a simple sum) across all partitions to end up with a KTable that looks something like:

(Sally, 4)

(Bob, 5)

As mentioned in the answer to the question I linked to above, it's not possible to directly do this kind of cross-partition aggregation. However, the answerer mentioned that it was possible if the messages have the same keys (which is true in this case). How might this be accomplished?

I would also like to be able to query these aggregate values from a "global" state store that is replicated across each instance of the Kafka Streams application.

My first thought was to use a GlobalKTable (which I believe, according to this page, should be what I need). However, the changelog topic for this state store has the same number of partitions as the original "events" topic, and simply does the aggregation on a per-partition basis rather than across all partitions.

This is a slimmed down version of my application - not really sure where to go from here:

final Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "metrics-aggregator");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, CustomDoubleSerde.class.getName());
streamsConfig.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), 0);
streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);

final StreamsBuilder builder = new StreamsBuilder();

KStream<String, Double> eventStream = builder.stream(INCOMING_EVENTS_TOPIC);
KTable<String, Double> aggregatedMetrics = eventStream
        .groupByKey()
        .aggregate(() -> 0d, (key, value, aggregate) -> value + aggregate);

aggregatedMetrics.toStream().print(Printed.<String, Double>toSysOut());
aggregatedMetrics.toStream().to(METRIC_CHANGES_TOPIC);

final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.cleanUp();
streams.start();

builder.globalTable(METRIC_CHANGES_TOPIC, Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as(METRICS_STORE_NAME));

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    streams.close();
}));
like image 531
Robert Herhold Avatar asked Jun 03 '18 19:06

Robert Herhold


People also ask

What is aggregate in Kafka streams?

Aggregating is a generalization of combining via reduce(...) as it, for example, allows the result to have a different type than the input values. The result is written into a local KeyValueStore (which is basically an ever-updating materialized view) that can be queried by the given store name in materialized .

Can a Kafka consumer listen to multiple partitions?

A consumer can be assigned to consume multiple partitions. So the rule in Kafka is only one consumer in a consumer group can be assigned to consume messages from a partition in a topic and hence multiple Kafka consumers from a consumer group can not read the same message from a partition.

Can a Kafka producer write to multiple partitions?

A Kafka producer can write to different partitions in parallel, which generally means that it can achieve higher levels of throughput.

How does Kafka aggregation work?

In the Kafka Streams DSL, an input stream of an aggregation operation can be a KStream or a KTable, but the output stream will always be a KTable. This allows Kafka Streams to update an aggregate value upon the out-of-order arrival of further records after the value was produced and emitted.


1 Answers

Kafka Streams assumes that input topics are partitioned by key. This assumption does not hold for your case. Thus, you need to tell Kafka Streams about this.

In your particular case, you would replace groupByKey with groupBy()

KTable<String, Double> aggregatedMetrics = eventStream
    .groupBy((k,v) -> k)
    .aggregate(() -> 0d, (key, value, aggregate) -> value + aggregate);

The lambda is a dummy that does not modify the key, however, it is a hint to Kafka Streams to re-partition the data based on key before doing the aggregation.

About GlobalKTable: this is a special kind of table that is not the result from an aggregation, but only populated from a changelog topic. It seems your code is doing the right thing already: Write the aggregation result into a topic and re-read the topic as GlobalKTable.

like image 88
Matthias J. Sax Avatar answered Sep 18 '22 03:09

Matthias J. Sax