Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Kafka 1.0.0 Streams API Multiple Multilevel groupby

How do i use .groupby with Multiple constrainst in Kafka Streams API. Same as in Java 8 Streams API example below

public void twoLevelGrouping(List<Person> persons) {
     final Map<String, Map<String, List<Person>>> personsByCountryAndCity = persons.stream().collect(
         groupingBy(Person::getCountry,
            groupingBy(Person::getCity)
        )
    );
    System.out.println("Persons living in London: " + personsByCountryAndCity.get("UK").get("London").size());
}
like image 277
Niraj Sanghani Avatar asked Feb 22 '18 12:02

Niraj Sanghani


1 Answers

You specify a combined key by putting all attributes/fields you want to group-by into the key.

KTable table = stream.selectKey((k, v,) -> k::getCountry + "-" + k::getCity)
                     .groupByKey()
                     .aggregate(...); // or maybe .reduce()

I just assumed that country and city are both String. You use Interactive Queries to query the store with

store.get("UK-London");

https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html

like image 114
Matthias J. Sax Avatar answered Sep 28 '22 08:09

Matthias J. Sax