I use org.apache.kafka:kafka-streams:0.10.0.1
I'm attempting to work with a time series based stream that doesn't seem to be triggering a KStream.Process()
to trigger ("punctuate"). (see here for reference)
In a KafkaStreams
config I'm passing in this param (among others):
config.put(
StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
EventTimeExtractor.class.getName());
Here, EventTimeExtractor
is a custom timestamp extractor (that implements org.apache.kafka.streams.processor.TimestampExtractor
) to extract the timestamp information from JSON data.
I would expect this to call my object (derived from TimestampExtractor
) when each new record is pulled in. The stream in question is 2 * 10^6 records / minute. I have punctuate()
set to 60 seconds and it never fires. I know the data passes this span very frequently since its pulling old values to catch up.
In fact it never gets called at all.
RocksDB. Kafka Streams uses RocksDB as the default state store. It is an embedded state store meaning writes do not entail a network call.
Kafka Streams natively supports "incremental" aggregation functions, in which the aggregation result is updated based on the values captured by each window. Incremental functions include `count()`, `sum()`, `min()`, and `max()`.
Just like a topic in Kafka, a stream in the Kafka Streams API consists of one or more stream partitions. A stream partition is an, ordered, replayable, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair.
Developers new to Apache Kafka might find it difficult to grasp the concept of Kafka brokers, clusters, partitions, topics, logs, and so on. The learning curve is steep. You'll need extensive training to learn Kafka's basic foundations and the core elements of an event streaming architecture.
Update Nov 2017: Kafka Streams in Kafka 1.0 now supports punctuate()
with both stream-time and with processing-time (wall clock time) behavior. So you can pick whichever behavior you prefer.
Your setup seems correct to me.
What you need to be aware of: As of Kafka 0.10.0, the punctuate()
method operates on stream-time (by default, i.e. based on the default timestamp extractor, stream-time will mean event-time). And the stream-time is only advanced when new data records are coming in, and how much the stream-time is advanced is determined by the associated timestamps of these new records.
For example:
punctuate()
to be called every 1 minute = 60 * 1000
(note: 1 minute of stream-time). Now, if it happens that no data is being received for the next 5 minutes, punctuate()
will not be called at all -- even though you might expect it to be called 5 times. Why? Again, because punctuate()
depends on stream-time, and the stream-time is only advanced based on newly received data records.Might this be causing the behavior you are seeing?
Looking ahead: There's already a ongoing discussion in the Kafka project on how to make punctuate()
more flexible, e.g. to have trigger it not only based on stream-time
(which defaults to event-time
) but also based on processing-time
.
Your approach seems to be correct. Compare pargraph "Timestamp Extractor (timestamp.extractor):" in http://docs.confluent.io/3.0.1/streams/developer-guide.html#optional-configuration-parameters
Not sure, why your custom timestamp extractor is not used. Have a look into org.apache.kafka.streams.processor.internals.StreamTask
. In the constructor there should be something like
TimestampExtractor timestampExtractor1 = (TimestampExtractor)config.getConfiguredInstance("timestamp.extractor", TimestampExtractor.class);
Check if your custom extractor is picked up there or not...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With