Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Stop a Kafka Streams app

Is it possible to have a Kafka Streams app that runs through all the data in a topic and then exits?

Example I'm producing data into topics based on date. The consumer gets kicked off by cron, runs through all the available data and then .. does what? I don't want it to sit and wait for more data. Just assume it's all there and then exit gracefully.

Possible?

like image 538
ethrbunny Avatar asked Aug 19 '16 23:08

ethrbunny


People also ask

Does Netflix use Kafka?

Apache Kafka is an open-source streaming platform that enables the development of applications that ingest a high volume of real-time data. It was originally built by the geniuses at LinkedIn and is now used at Netflix, Pinterest and Airbnb to name a few.

What is difference between Kafka connect and Kafka Streams?

Kafka Streams is an API for writing client applications that transform data in Apache Kafka. You usually do this by publishing the transformed data onto a new topic. The data processing itself happens within your client application, not on a Kafka broker. Kafka Connect is an API for moving data into and out of Kafka.

What Kafka Streams application?

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in an Apache Kafka® cluster. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology.

How do I reset Kafka Streams?

You can either use the API method KafkaStreams#cleanUp() in your application code or manually delete the corresponding local state directory ( state. dir configuration parameter).


2 Answers

In Kafka Streams (as for other stream processing solutions), the is no "end of data" because it is stream processing in the first place -- and not batch processing.

Nevertheless, you could watch the "lag" of your Kafka Streams application and shut it down if there is not lag (lag, is the number of not yet consumed messages).

For example, you can use bin/kafka-consumer-groups.sh to check the lag of your Streams application (the application ID is used as consumer group ID). If you want to embed this in your Streams applications, you can use kafka.admin.AdminClient to get consumer group information.

like image 165
Matthias J. Sax Avatar answered Sep 22 '22 00:09

Matthias J. Sax


You can create a consumer and then once it stops pulling up data, you can have call consumer.close(). Or if you want to poll again in the future just call consumer.pause() and call .resume later.

One way to do this is within the consumer poll block. Such as

data = consumer.poll()
if (!data.next()) {
   consumer.close()
}

Keep in mind that poll returns ConsumerRecord<K,V> and conforms to the Iterable interface.

like image 21
TheM00s3 Avatar answered Sep 26 '22 00:09

TheM00s3