I am developing a simple Kafka Stream application which extracting messages from a topic and put it into another topic after transformation. I am using Intelij for my development.
When I debug/run this application, it works perfect if my IDE and the Kafka Server sitting in the SAME machine
(i.e. with the BOOTSTRAP_SERVERS_CONFIG = localhost:9092 and SCHEMA_REGISTRY_URL_CONFIG = localhost:8081)
However, when I try to use another machine to do the development
(i.e. with the BOOTSTRAP_SERVERS_CONFIG = XXX.XXX.XXX:9092 and SCHEMA_REGISTRY_URL_CONFIG = XXX.XXX.XXX:8081 where XXX.XXX.XXX is the ip address of my Kafka),
the debug process run without problem at the 1st time. However, when I run 2nd time after resetting the offset, I received the following error:
ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297)
java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\my_application_id\0_0
Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException:
If I changed my_application_id
as my_application_id2
, and run it, it works again at the 1st time but receiving error again if I run it again.
I have the following code in my last sentence in my application:
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Any advice how to solve this problem?
UPDATE:
I have reviewed the state directory created in my development machine (Windows Platform) and if I delete these directory manually before running 2nd time, no error found. I have tried to run my IDE as Administrator because I think this could be something about the permission on the folder. However, this doesn't help.
Full stack for reference:
INFO Kafka version : 1.1.0 (org.apache.kafka.common.utils.AppInfoParser:109) INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser:110) INFO stream-thread [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup. (org.apache.kafka.streams.processor.internals.StateDirectory:281) Disconnected from the target VM, address: '127.0.0.1:16552', transport: 'socket' Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException: C:\workspace\bennychan\kafka-streams\my_application_001\0_0 at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:231) at org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931) at com.macroviewhk.financialreport.simpleStream.start(simpleStream.java:60) at com.macroviewhk.financialreport.simpleStream.main(simpleStream.java:45) Caused by: java.nio.file.DirectoryNotEmptyException: C:\workspace\bennychan\kafka-streams\my_application_001\0_0 at sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266) at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) at java.nio.file.Files.delete(Files.java:1126) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:651) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:634) at java.nio.file.Files.walkFileTree(Files.java:2688) at java.nio.file.Files.walkFileTree(Files.java:2742) at org.apache.kafka.common.utils.Utils.delete(Utils.java:634) ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297) at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287) java.nio.file.DirectoryNotEmptyException: C:\workspace\bennychan\kafka-streams\my_application_001\0_0 at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:228) at sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266) ... 3 more at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) at java.nio.file.Files.delete(Files.java:1126) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:651) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:634) at java.nio.file.Files.walkFileTree(Files.java:2688) at java.nio.file.Files.walkFileTree(Files.java:2742) at org.apache.kafka.common.utils.Utils.delete(Utils.java:634) at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287) at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:228) at org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931) at com.macroviewhk.financialreport.simpleStream.start(simpleStream.java:60) at com.macroviewhk.financialreport.simpleStream.main(simpleStream.java:45)
UPDATE 2 : After another detailed check, the line below throwing IOException
Files.walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {
This line is located at kafka-clients-1.1.0.jar org.apache.kafka.common.utilsUtils.class
May be this is the problem with Windows system (sorry that I am not an experienced JAVA programmer).
For googlers..
I'm currently using this Scala code for helping windows guys to handle deletion of state store.
if (System.getProperty("os.name").toLowerCase.contains("windows")) {
logger.info("WINDOWS OS MODE - Cleanup state store.")
try {
FileUtils.deleteDirectory(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
FileUtils.forceMkdir(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
} catch {
case e: Exception => logger.error(e.toString)
}
}
else {
streams.cleanUp()
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With