Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka - problems with TimestampExtractor

I use org.apache.kafka:kafka-streams:0.10.0.1

I'm attempting to work with a time series based stream that doesn't seem to be triggering a KStream.Process() to trigger ("punctuate"). (see here for reference)

In a KafkaStreams config I'm passing in this param (among others):

config.put(
  StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
  EventTimeExtractor.class.getName());

Here, EventTimeExtractor is a custom timestamp extractor (that implements org.apache.kafka.streams.processor.TimestampExtractor) to extract the timestamp information from JSON data.

I would expect this to call my object (derived from TimestampExtractor) when each new record is pulled in. The stream in question is 2 * 10^6 records / minute. I have punctuate() set to 60 seconds and it never fires. I know the data passes this span very frequently since its pulling old values to catch up.

In fact it never gets called at all.

  • Is this the wrong approach to setting timestamps on KStream records?
  • Is this the wrong way to declare this configuration?
like image 527
ethrbunny Avatar asked Sep 16 '16 15:09

ethrbunny


People also ask

Does Kafka streams use RocksDB?

RocksDB. Kafka Streams uses RocksDB as the default state store. It is an embedded state store meaning writes do not entail a network call.

Can Kafka aggregate data?

Kafka Streams natively supports "incremental" aggregation functions, in which the aggregation result is updated based on the values captured by each window. Incremental functions include `count()`, `sum()`, `min()`, and `max()`.

Is Kafka stream immutable?

Just like a topic in Kafka, a stream in the Kafka Streams API consists of one or more stream partitions. A stream partition is an, ordered, replayable, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair.

Is Apache Kafka difficult?

Developers new to Apache Kafka might find it difficult to grasp the concept of Kafka brokers, clusters, partitions, topics, logs, and so on. The learning curve is steep. You'll need extensive training to learn Kafka's basic foundations and the core elements of an event streaming architecture.


2 Answers

Update Nov 2017: Kafka Streams in Kafka 1.0 now supports punctuate() with both stream-time and with processing-time (wall clock time) behavior. So you can pick whichever behavior you prefer.

Your setup seems correct to me.

What you need to be aware of: As of Kafka 0.10.0, the punctuate() method operates on stream-time (by default, i.e. based on the default timestamp extractor, stream-time will mean event-time). And the stream-time is only advanced when new data records are coming in, and how much the stream-time is advanced is determined by the associated timestamps of these new records.

For example:

  • Let's assume you have set punctuate() to be called every 1 minute = 60 * 1000 (note: 1 minute of stream-time). Now, if it happens that no data is being received for the next 5 minutes, punctuate() will not be called at all -- even though you might expect it to be called 5 times. Why? Again, because punctuate() depends on stream-time, and the stream-time is only advanced based on newly received data records.

Might this be causing the behavior you are seeing?

Looking ahead: There's already a ongoing discussion in the Kafka project on how to make punctuate() more flexible, e.g. to have trigger it not only based on stream-time (which defaults to event-time) but also based on processing-time.

like image 52
Michael G. Noll Avatar answered Sep 23 '22 21:09

Michael G. Noll


Your approach seems to be correct. Compare pargraph "Timestamp Extractor (timestamp.extractor):" in http://docs.confluent.io/3.0.1/streams/developer-guide.html#optional-configuration-parameters

Not sure, why your custom timestamp extractor is not used. Have a look into org.apache.kafka.streams.processor.internals.StreamTask. In the constructor there should be something like

TimestampExtractor timestampExtractor1 = (TimestampExtractor)config.getConfiguredInstance("timestamp.extractor", TimestampExtractor.class);

Check if your custom extractor is picked up there or not...

like image 34
Matthias J. Sax Avatar answered Sep 24 '22 21:09

Matthias J. Sax