I'm receiving events which end up in Kafka. From these events I fetch the id using a Kafka Streams application and posting it back to Kafka as a pair of (id, 1) in another topic. Then I would like to see if the id exists already in ElasticSearch, and if so update its counter, otherwise create a new record in ElasticSearch with the id from Kafka and counter set to 1, i.e. an upsert of the record (id, 1) to ES.
I was hoping to use Kafka Connect to ElasticSearch for this, but it seems to be not that straightforward if possible at all. I can see that adding records to ES works, but merging with existing records seems is something I haven't found out about yet. Is this possible already, and if so, how, and if not, is it planned to be possible in a nearby release?
I forked the datamountaineer ES sink connector to allow Upsert. With it you can specify a PK and run an update with docAsUpsert into ES. You can grab the project and compile the Jar from my github fork.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With