I'm using FluentD (v.12 last stable version) to send messages to Kafka. But FluentD is using an old KafkaProducer, so that the records timestamp is always set to -1. Thus i have to use the WallclockTimestampExtractor to set the timestamp of the record to the point in time, when the message arrives in kafka.
Is there a Kafka Streams-specific solution?
The timestamp i'm realy interested in, is send by fluentd within the message:
"timestamp":"1507885936","host":"V.X.Y.Z."
record representation in kafka:
offset = 0, timestamp= - 1, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}
i would like to have a record like this in kafka:
offset = 0, timestamp= 1507885936, key = null, value = {"timestamp":"1507885936","host":"V.X.Y.Z."}
my workaround would look like:
write a consumer to extract the timestamp (https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)
write a producer to produce a new record with the timestamp set (ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
I would prefer a KafkaStreams solution, if there is one.
In the Navigation Pane, double-click the table to which you want to add the time stamp field. Access opens the table in Datasheet view. In the first blank column labeled Click to Add, select Date & Time from the drop-down list of data types. Access creates a new field and then displays a temporary field name.
Follow these steps: Open the Orders table in Design View. Click on the Date field. In the Table Properties window, click in the Default text box and enter Date().
You can write a very simple Kafka Streams Application like:
KStreamBuilder builder = new KStreamBuilder();
builder.stream("input-topic").to("output-topic");
and configure the application with a custom TimestampExtractor
that extract the timestamp from the record and returns it.
Kafka Streams will use the returned timestamps when writing the records back to Kafka.
Note: if you have out of order data -- ie, timestamps are not strictly ordered -- the result will contain out of order timestamps, too. Kafka Streams uses the returned timestamps to writing back to Kafka (ie, whatever the extractor returns, is used as record metadata timestamp). Note, that on write, the timestamp from the currently processed input record is used for all generated output records -- this hold for version 1.0 but might change in future releases.).
Update:
In general, you can modify timestamps via the Processor API. Calling context.forward()
you can set the output record timestamp via To.all().withTimestamp(...)
as a parameter for forward()
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With