Is it possible to have a Kafka Streams app that runs through all the data in a topic and then exits?
Example I'm producing data into topics based on date. The consumer gets kicked off by cron, runs through all the available data and then .. does what? I don't want it to sit and wait for more data. Just assume it's all there and then exit gracefully.
Possible?
Apache Kafka is an open-source streaming platform that enables the development of applications that ingest a high volume of real-time data. It was originally built by the geniuses at LinkedIn and is now used at Netflix, Pinterest and Airbnb to name a few.
Kafka Streams is an API for writing client applications that transform data in Apache Kafka. You usually do this by publishing the transformed data onto a new topic. The data processing itself happens within your client application, not on a Kafka broker. Kafka Connect is an API for moving data into and out of Kafka.
Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in an Apache Kafka® cluster. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology.
You can either use the API method KafkaStreams#cleanUp() in your application code or manually delete the corresponding local state directory ( state. dir configuration parameter).
In Kafka Streams (as for other stream processing solutions), the is no "end of data" because it is stream processing in the first place -- and not batch processing.
Nevertheless, you could watch the "lag" of your Kafka Streams application and shut it down if there is not lag (lag, is the number of not yet consumed messages).
For example, you can use bin/kafka-consumer-groups.sh
to check the lag of your Streams application (the application ID is used as consumer group ID). If you want to embed this in your Streams applications, you can use kafka.admin.AdminClient
to get consumer group information.
You can create a consumer
and then once it stops pulling up data, you can have call consumer.close()
. Or if you want to poll again in the future just call consumer.pause()
and call .resume
later.
One way to do this is within the consumer poll block. Such as
data = consumer.poll()
if (!data.next()) {
consumer.close()
}
Keep in mind that poll
returns ConsumerRecord<K,V>
and conforms to the Iterable
interface.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With