Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to finish kafka consumer safety?(Is there meaning to call thread#join inside shutdownHook ? )

I am reading this article

And here code to finish the consumer thread:

Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                System.out.println("Starting exit...");
                consumer.wakeup(); 1
                try {
                    mainThread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

As I understand ShutdownHook invokes when all non-daemon threads have finished but before the process was destroyed by OS.
1. From my point of view mainThread.join() is useless. Main thread always will finished to the moment of ShutdownHook execution. Is it correct or I misunderstood something?
2. Actually I don't understand why do we need to await main thred? we need to await close method execution?

P.S.

Book provides following main method code:

try {
            // looping until ctrl-c, the shutdown hook will cleanup on exit
            while (true) {
                ConsumerRecords<String, String> records =
                  movingAvg.consumer.poll(1000);
                System.out.println(System.currentTimeMillis() + "--  waiting for data...");
                for (ConsumerRecord<String, String> record :
                  records) {
                    System.out.printf("offset = %d, key = %s,
                      value = %s\n",
                      record.offset(), record.key(),
                      record.value());
                }
                for (TopicPartition tp: consumer.assignment())
                    System.out.println("Committing offset at position:" + consumer.position(tp));
                movingAvg.consumer.commitSync();
            }
        } catch (WakeupException e) {
            // ignore for shutdown 2
        } finally {
            consumer.close(); 3
            System.out.println("Closed consumer and we are done");
        }
    }
like image 969
gstackoverflow Avatar asked Nov 08 '25 07:11

gstackoverflow


1 Answers

You do consumer.wakeup() to interrupt current consumer's operation (that might be long-running (e.g. a poll) or even blocked (what could happen in case of beginningOffsets(...).

The mainThread.join() is put in there to ensure that main thread actually finishes and is not shut down in middle of processing after wakeup. Please remember that shutdownHook is responsible for handling interrupts as well, not only ordinary program shutdown.

So if you interrupt with e.g. ctrl-C:

1. shutdown hook gets called
2. main thread is still running, most often waiting for data in `poll`
3. shutdown hook `wakeup`-s the main thread
4. main thread enters the exception handler, breaks the loop
5. main thread closes the consumer with `.close()`
6. shutdown hook waits for 5. and finishes

Without waiting you might have not performed the consumer shutdown steps in steps 4 & 5.

like image 157
Adam Kotwasinski Avatar answered Nov 10 '25 00:11

Adam Kotwasinski



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!