Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ExecutorService - Unable to stop Thread from ServletContextListener when the context is destroyed

I am starting a Thread from ServletContextListener when the context is initialized and trying to stop it when the context is destroyed. The class is:

public enum BlinkLedTask {

    INSTANCE;

    private Logger logger = RpiLogger.getLogger(getClass());

    private Task task;
    private ExecutorService service;

    private BlinkLedTask() {

    }

    public void run(String[] frequency) {
        stop();

        task = new Task(frequency);
        service = Executors.newSingleThreadExecutor(RpiThreadFactory.INSTANCE);
        service.execute(task);
    }

    public void stop() {
        if(Objects.isNull(task) || Objects.isNull(service)) {
            return;
        }

        try {
            task.terminate();
            service.shutdownNow();
        } catch (Exception cause) {
            logger.error(cause.getMessage(), cause);
        }       
    }

    private static class Task implements Runnable {

        private volatile boolean running = true;
        private String[] frequency;
        private volatile Logger logger = RpiLogger.getLogger(getClass());

        private Task(String[] frequency) {
            this.frequency = frequency;
        }       

        @Override
        public void run() {
            while(running && !Thread.interrupted()) {
                try {
                    resetLed();
                    blinkLed();
                } catch (Throwable cause) {
                    logger.error(cause.getMessage(), cause);
                    running = false;

                    try {
                        resetLed();             
                    } catch (Throwable ignore) {
                    }
                } 
            }
        }

        private void resetLed() throws Exception {
            executeScript(Script.BLINK_LED_RESET);      
        }

        private void blinkLed() throws Exception {
            executeScript(Script.BLINK_LED, new String[]{frequency[0], frequency[1], frequency[2]});        
        }

        private void executeScript(Script script, String... args) {
            ScriptExecutor scriptExecutor = new ScriptExecutor(ScriptExecutor.BASH, script);
            scriptExecutor.execute(true, args);
        }

        private void terminate() {
            logger.info("Stopping - " + Thread.currentThread().getName());
            running = false;
        }
    }
}

This a Singleton and the which runs a shell script until it is stopped. This class can be called from anywhere, so I need to stop the thread, if there is any currently executing the shell script, before creating a new Thread.

For testing purpose I have executed the run() method of this class when the context is initialized and called the stop() at the time of destroy.

I have redeploy the war file after removing code run(), I was expecting that the stop() will terminate the task, but it didn't.

I also have tried a different implementation of the run() and stop():

public void run(String[] frequency) {
    stop();

    task = new Task(frequency);
    Thread thread = RpiThreadFactory.INSTANCE.newThread(task);
    tasks.add(ImmutablePair.of(thread, task));
    thread.start();
}

public void stop() {
    for(ImmutablePair<Thread, Task> pair : tasks) {
        try {
            pair.right.terminate();
            pair.left.join();
        } catch (Exception ex) {

        }           
    }
}

Here the tasks is private ArrayList<ImmutablePair<Thread, Task>> tasks = new ArrayList<ImmutablePair<Thread,Task>>();. The ImmutablePair belongs to commons-lang3. But I received java.util.ConcurrentModificationException on the iteration of the enhanced for loop. The cause I don't know.

Update

When the server get shutdown the stop() is working as expected. I am using Jetty.

Update

RpiThreadFactory:

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.log4j.Logger;

import com.edfx.rpi.app.utils.logger.RpiLogger;

public enum RpiThreadFactory implements ThreadFactory {
    INSTANCE;

    private final AtomicInteger poolNumber = new AtomicInteger(1);
    private final Logger logger = RpiLogger.getLogger(getClass());
    private final ThreadGroup threadGroup;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    private RpiThreadFactory() {
        SecurityManager securityManager = System.getSecurityManager();
        threadGroup = (securityManager != null) ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
        namePrefix = "RpiPool-" + poolNumber.getAndIncrement() + "-Thread-";

    }

    public Thread newThread(Runnable runnable) {
        Thread thread = new Thread(threadGroup, runnable, namePrefix + threadNumber.getAndIncrement(), 0);
        thread.setPriority(Thread.NORM_PRIORITY);
        thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {

            public void uncaughtException(Thread thread, Throwable cause) {
                logger.error(cause.getMessage(), cause);
            }
        });

        return thread;
    }
}

ScriptExecutor:

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;

import com.edfx.rpi.app.utils.logger.RpiLogger;

public class ScriptExecutor {

    private static final Logger LOGGER = RpiLogger.getLogger(ScriptExecutor.class);
    public static final String BASH = "/bin/bash";

    private Script script;
    private Process process;
    private String output;
    private int exitValue;

    public ScriptExecutor(Script script) {
        this.script = script;

    }

    public void execute(boolean destroyProcess, String... args) throws ScriptNotExistException {                
        if(!script.exists()) {
            throw new ScriptNotExistException(script.getScriptName() + " does not exists.");
        }

        try {
            List<String> commands = new ArrayList<>();

            commands.add(BASH);
            commands.add(script.getAbsoultePath());

            if(Objects.nonNull(args)) {
                commands.addAll(Arrays.asList(args));
            }

            StringBuilder builder = new StringBuilder("Executing script: ");
            builder.append(script.getScriptName());

            if(Objects.nonNull(args) && args.length > 0) {
                builder.append(" with parameters: ");
                builder.append(StringUtils.join(args, " "));
            }

            LOGGER.info(builder.toString());

            ProcessBuilder processBuilder = new ProcessBuilder(commands.toArray(new String[commands.size()]));
            process = processBuilder.start();

            StringBuilder outputBuilder = new StringBuilder();
            InputStream inputStream = process.getInputStream();
            InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
            BufferedReader bufferedReader = new BufferedReader(inputStreamReader);

            String line = StringUtils.EMPTY;

            while ((line = bufferedReader.readLine()) != null) {
                outputBuilder.append(line);
                outputBuilder.append("\n");
            }

            process.waitFor();

            exitValue = process.exitValue();
            LOGGER.info("Process for: " + script.getScriptName() + " is executed. Exit value: " + exitValue);

            if(destroyProcess) {
                destroyProcess();
            }

            output = outputBuilder.toString();
        } catch (Exception cause) {
            throw new ScriptExecutionException(cause);
        }       
    }

    public String getOutput() {
        return output;
    }

    public int getExitValue() {
        return exitValue;
    }

    public void destroyProcess() {
        if(Objects.nonNull(process)) {
            LOGGER.info("Process for: " + script.getScriptName() + " is destroyed.");
            process.destroy();
        }
    }
}

Purpose

This is a web application running in Jetty web container. The server is installed in an embedded hardware java enabled. How this hardware has a LED attached to it. The application accepts external request, which can be REST and start-stops the LED. So the LED can start blinking for any request; but it serves only one request at a time.

This is why I have the stop which stops previously running process, if there is any. The stop works for normal condition.

But I saw that while the LED is blinking and I did a deployment without stopping the server the running thread doesn't stops. If I stop the server and did the deployment and the start again, the running thread kills at this time.

The thread loops in the while and executes a Process to the native. This Process is an one time job, so this Process is not making the thread to get killed.

To reproduce the issue what I did I created the thread when the context is initialized and tried to kill it when it is destroyed. Now if I write something in the contextDestroyed I can see them get executed.

I don't understand why stopping the server kills the thread not when I redeploy.

like image 505
Tapas Bose Avatar asked Oct 21 '22 01:10

Tapas Bose


1 Answers

You should call process.destroy() on instance of Process returned by processBuilder.start(). Actually what you do when calling BlinkLedTask.terminate() is just setting some flag. You should at this point call process.destroy().

Below I present an example how you can rewrite this.It does not involve your class ScriptExecutor (of course you can move your logic there and return instance of process to BlinkLedTask when calling blinkLed()).

The main difference here is that I'm keeping the reference to instance of Process in field blinkLedProcess and when terminate() is called I'm directly calling process.destroy() to destroy the process.

You wrote that "When the server get shutdown the stop() is working as expected. I am using Jetty." Yes indeed. This is because by calling processBuilder.start(); you create subprocess of your main jetty process. When you kill jetty all its subproceses are also killed. If you don't kill jetty you need to manually kill the subprocess by calling destroy() method.

It should be something like:

public enum BlinkLedTask {
(...)

    private Process resetLedProcess;
    private Process blinkLedProcess;

(...)
   private void blinkLed() throws Exception {
      String[] args = new String[] { frequency[0], frequency[1], frequency[2] };

      List<String> commands = new ArrayList<>();

      //commands.add(BASH);
      commands.add(script.getAbsoultePath());

      if (Objects.nonNull(args)) {
        commands.addAll(Arrays.asList(args));
      }

      StringBuilder builder = new StringBuilder("Executing script: ");
      builder.append(script.getAbsoultePath());

      if (Objects.nonNull(args) && (args.length > 0)) {
        builder.append(" with parameters: ");
        builder.append(StringUtils.join(args, " "));
      }


      ProcessBuilder processBuilder = new ProcessBuilder(commands.toArray(new String[commands.size()]));

      blinkLedProcess = processBuilder.start();

      StringBuilder outputBuilder = new StringBuilder();
      InputStream inputStream = blinkLedProcess.getInputStream();
      InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
      BufferedReader bufferedReader = new BufferedReader(inputStreamReader);

      String line = StringUtils.EMPTY;

      while ((line = bufferedReader.readLine()) != null) {
        outputBuilder.append(line);
        outputBuilder.append("\n");
      }


      blinkLedProcess.waitFor();

      int exitValue = blinkLedProcess.exitValue();
      System.out.println(
        "Process for: " + Script.BLINK_LED.getAbsoultePath() + " is executed. Exit value: " + exitValue);


    }

(...)

   private void terminate() {
      System.out.println("Stopping - " + Thread.currentThread().getName());
      running = false;
      if (resetLedProcess != null) {
        resetLedProcess.destroy();
        System.out.println("Destroyed reset process");
      }
      if (blinkLedProcess != null) {
        blinkLedProcess.destroy();
        System.out.println("Destroyed blink process");
      }
    }
(...)
}
like image 157
walkeros Avatar answered Oct 29 '22 00:10

walkeros