I have a piece of code that monitors a directory for addition of files. Whenever a new file is added to the directory, the contents of the file are picked and published on kafka and then the file is deleted.
This works when I make a single request but as soon as I subject my code to 5 or 10 user request from jMeter, the contents are published on kafka successfully but the code isn't able to delete the file. I get a FileSystemException
with a message that The process cannot access the file because it is being used by another process.
.
I guess there is some concurrency issue which I am unable to see.
public void monitor() throws IOException, InterruptedException {
Path faxFolder = Paths.get(TEMP_FILE_LOCATION);
WatchService watchService = FileSystems.getDefault().newWatchService();
faxFolder.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
boolean valid = true;
do {
WatchKey watchKey = watchService.take();
for (WatchEvent<?> event : watchKey.pollEvents()) {
if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) {
String fileName = event.context().toString();
publishToKafka(new File(TEMP_FILE_LOCATION + fileName).toPath(), "topic");
}
}
valid = watchKey.reset();
} while (valid);
}
private void publishToKafka(Path path, String topic) {
try (BufferedReader reader = Files.newBufferedReader(path)) {
String input = null;
while ((input = reader.readLine()) != null) {
kafkaProducer.publishMessageOnTopic(input, topic);
}
} catch (IOException e) {
LOG.error("Could not read buffered file to send message on kafka.", e);
} finally {
try {
Files.deleteIfExists(path); // This is where I get the exception
} catch (IOException e) {
LOG.error("Problem in deleting the buffered file {}.", path.getFileName(), e);
}
}
}
Exception Log :
java.nio.file.FileSystemException: D:\upload\notif-1479974962595.csv: The process cannot access the file because it is being used by another process.
at sun.nio.fs.WindowsException.translateToIOException(Unknown Source)
at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source)
at sun.nio.fs.WindowsException.rethrowAsIOException(Unknown Source)
at sun.nio.fs.WindowsFileSystemProvider.implDelete(Unknown Source)
at sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(Unknown Source)
at java.nio.file.Files.deleteIfExists(Unknown Source)
at com.panasonic.mdw.core.utils.MonitorDirectory$FileContentPublisher.publishToKafka(MonitorDirectory.java:193)
at com.panasonic.mdw.core.utils.MonitorDirectory$FileContentPublisher.sendData(MonitorDirectory.java:125)
at com.panasonic.mdw.core.utils.MonitorDirectory$FileContentPublisher.run(MonitorDirectory.java:113)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Looking at your code it seems when one file is picked by thread for publishing again another thread is picking it up for publishing. That's why no one is able to delete it. It must be concurrency issue only. You should redesign code based up on criterion : steps which can be run concurrently and those which cannot be. So steps in the entire process are :
Also the moment a file is selected, you can read it into buffer , delete it and then continue with publish. This will make sure that main thread does not assign this file to some other thread.
It is always a better idea to add sleep time in WatchService events:
if (StandardWatchEventKinds.ENTRY_CREATE.equals(event.kind())) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// now do your intended jobs ...
I needed to add sleep time otherwise it wouldn't work for multiple requests and used to get the error:
The process cannot access the file because it is being used by another process
I had similar problem as well as in the following thread: Multithreading on Queue when trying to upload a dynamically created files to a Queue service and it was taking 2 days for me to resolve. Thanks to Holger who gave answer as above whereby the locking occurs might be due to the creation had not fully done upon reading by another thread, it has saved me a lot of time.
My initial solution as found a lot from the internet, was:
WatchEvent<Path> ev = cast(event);
Path name = ev.context();
Path child = dir.resolve(name);
//queueUploadFile(child);
if (kind == ENTRY_CREATE) {
uploadToQueue(this.queueId, child);
}
I changed it to:
WatchEvent<Path> ev = cast(event);
Path name = ev.context();
Path child = dir.resolve(name);
//queueUploadFile(child);
if (kind == ENTRY_MODIFY) {
uploadToQueue(this.queueId, child);
}
And everything works perfectly. To handle the "somehow" multiple ENTRY_MODIFY events firing (duplicated files uploaded), I perform deletion on the file inside the uploadToQueue() method once it's uploaded.
I hope my approach taken based on the above contribution will also help others with similar problem.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With