Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Streams: Custom TimestampExtractor for aggregation

I am building a pretty straightforward KafkaStreams demo application, to test a use case.

I am not able to upgrade the Kafka broker I am using (which is currently on version 0.10.0), and there are several messages written by a pre-0.10.0 Producer, so I am using a custom TimestampExtractor, which I add as a default to the config in the beginning of my main class:

config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, GenericRecordTimestampExtractor.class);

When consuming from my source topic, this works perfectly fine. But when using an aggregation operator, I run into an exception because the FailOnInvalidTimestamp implementation of TimestampExtractor is used instead of the custom implementation when consuming from the internal aggregation topic.

The code of the Streams app looks something like this:

...

KStream<String, MyValueClass> clickStream = streamsBuilder
              .stream("mytopic", Consumed.with(Serdes.String(), valueClassSerde));

KTable<Windowed<Long>, Long> clicksByCustomerId = clickStream
              .map(((key, value) -> new KeyValue<>(value.getId(), value)))
              .groupByKey(Serialized.with(Serdes.Long(), valueClassSerde))
              .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(1)))
              .count();
...

The Exception I'm encountering is the following:

    Exception in thread "click-aggregator-b9d77f2e-0263-4fa3-bec4-e48d4d6602ab-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: 
Input record ConsumerRecord(topic = click-aggregator-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition, partition = 9, offset = 0, CreateTime = -1, serialized key size = 8, serialized value size = 652, headers = RecordHeaders(headers = [], isReadOnly = false), key = 11230, value = org.example.MyValueClass@2a3f2ea2) has invalid (negative) timestamp. 
Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.

Now the question is: Is there any way I can make Kafka Streams use the custom TimestampExtractor when reading from the internal aggregation topic (optimally while still using the Streams DSL)?

like image 777
haukeh Avatar asked Jan 24 '18 17:01

haukeh


People also ask

What is aggregate in Kafka Streams?

aggregate. Aggregate the values of records in this stream by the grouped key. Records with null key or value are ignored. Aggregating is a generalization of combining via reduce(...) as it, for example, allows the result to have a different type than the input values.

How do you make KTable KStream?

You'll take an existing KStream object and use the toTable() method to covert it into a KTable . This new method (as of Apache Kafka 2.5) allows you to simply convert a record stream to a changelog stream. In this case you've materialized the KTable , so it's available for you to use Interactive Queries.

How is KTable stored in Kafka?

Internally, a KTable is implemented using RocksDB and a topic in Kafka. RocksDB stores the current data of the table (note, that RocksDB is not an in-memory store, and can write to disk). At the same time, each update to the KTable (ie, to RocksDB) is written into the corresponding Kafka topic.


1 Answers

You cannot change the timestamp extractor (as of v1.0.0). This is not allowed for correctness reasons.

But I am really wondering, how a record with timestamp -1 is written into this topic in the first place. Kafka Streams uses the timestamp that was provided by your custom extractor when writing the record. Also note, that KafkaProducer does not allow to write records with negative timestamp.

Thus, the only explanation I can think of is that some other producer did write into the repartitioning topic -- and this is not allowed... Only Kafka Streams should write into the repartioning topic.

I guess, you will need to delete this topic and let Kafka Streams recreate it to get back into a clean state.

From the discussion/comment of the other answer:

You need 0.10+ format to work with Kafka Streams. If you upgrade your brokers and keep 0.9 format or older, Kafka Streams might not work as expected.

like image 188
Matthias J. Sax Avatar answered Sep 23 '22 17:09

Matthias J. Sax