Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create a blocking background loader in Java 8?

Tags:

Question

How do you create a proper background loader in Java 8? The conditions:

  • data should be loaded in background
  • after the loading the data should be displayed
  • while data are loaded no further requests should be accepted
  • if there were requests while the data were loaded another loading should be scheduled after a certain timeout (e. g. 5 seconds)

The purpose is to e. g. have reload requests accepted, but not the database flooded with the requests.

MCVE

Here's a MCVE. It consists of a background task which simulates the loading by simply invoking Thread.sleep for 2 seconds. The task is scheduled every second which naturally leads to an overlap of the background loading tasks, which should be avoided.

public class LoadInBackgroundExample {    /**    * A simple background task which should perform the data loading operation. In this minimal example it simply invokes Thread.sleep    */   public static class BackgroundTask implements Runnable {      private int id;      public BackgroundTask(int id) {       this.id = id;     }      /**      * Sleep for a given amount of time to simulate loading.      */     @Override     public void run() {        try {          System.out.println("Start #" + id + ": " + Thread.currentThread());          long sleepTime = 2000;          Thread.sleep( sleepTime);        } catch (InterruptedException e) {         e.printStackTrace();       } finally {         System.out.println("Finish #" + id + ": " + Thread.currentThread());       }      }   }    /**    * CompletableFuture which simulates loading and showing data.    * @param taskId Identifier of the current task    */   public static void loadInBackground( int taskId) {      // create the loading task     BackgroundTask backgroundTask = new BackgroundTask( taskId);      // "load" the data asynchronously     CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {        @Override       public String get() {          CompletableFuture<Void> future = CompletableFuture.runAsync(backgroundTask);          try {            future.get();          } catch (InterruptedException | ExecutionException e) {           e.printStackTrace();         }          return "task " + backgroundTask.id;       }     });      // display the data after they are loaded     CompletableFuture<Void> future = completableFuture.thenAccept(x -> {        System.out.println( "Background task finished:" + x);      });    }     public static void main(String[] args) {      // runnable which invokes the background loader every second     Runnable trigger = new Runnable() {        int taskId = 0;        public void run() {           loadInBackground( taskId++);        }     };      // create scheduler     ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);     ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(trigger, 0, 1, TimeUnit.SECONDS);      // cancel the scheudler and the application after 10 seconds     scheduler.schedule(() -> beeperHandle.cancel(true), 10, TimeUnit.SECONDS);      try {       beeperHandle.get();     } catch (Throwable th) {     }      System.out.println( "Cancelled");     System.exit(0);   }  } 

The output is this:

Start #0: Thread[ForkJoinPool.commonPool-worker-2,5,main] Start #1: Thread[ForkJoinPool.commonPool-worker-4,5,main] Start #2: Thread[ForkJoinPool.commonPool-worker-6,5,main] Finish #0: Thread[ForkJoinPool.commonPool-worker-2,5,main] Background task finished:task 0 Finish #1: Thread[ForkJoinPool.commonPool-worker-4,5,main] Background task finished:task 1 Start #3: Thread[ForkJoinPool.commonPool-worker-4,5,main] Finish #2: Thread[ForkJoinPool.commonPool-worker-6,5,main] Background task finished:task 2 Start #4: Thread[ForkJoinPool.commonPool-worker-6,5,main] Start #5: Thread[ForkJoinPool.commonPool-worker-2,5,main] Finish #3: Thread[ForkJoinPool.commonPool-worker-4,5,main] Background task finished:task 3 Start #6: Thread[ForkJoinPool.commonPool-worker-4,5,main] Finish #4: Thread[ForkJoinPool.commonPool-worker-6,5,main] Background task finished:task 4 Finish #5: Thread[ForkJoinPool.commonPool-worker-2,5,main] Background task finished:task 5 Start #7: Thread[ForkJoinPool.commonPool-worker-2,5,main] Finish #6: Thread[ForkJoinPool.commonPool-worker-4,5,main] Start #8: Thread[ForkJoinPool.commonPool-worker-6,5,main] Background task finished:task 6 Start #9: Thread[ForkJoinPool.commonPool-worker-4,5,main] Finish #7: Thread[ForkJoinPool.commonPool-worker-2,5,main] Background task finished:task 7 Start #10: Thread[ForkJoinPool.commonPool-worker-2,5,main] Finish #8: Thread[ForkJoinPool.commonPool-worker-6,5,main] Background task finished:task 8 Cancelled 

The goal is to have e. g. #1 and #2 skipped because #0 is still running.

Problem

Where do you properly set the blocking mechanism? Should synchronization be used? Or some AtomicBoolean? And if so, should it be inside the get() method or elsewhere?

like image 265
Roland Avatar asked Nov 11 '19 06:11

Roland


2 Answers

You already have a threadpool to execute the task. It's not necessarily and make thing complicated to run the task in another async executor (ForkJoinPool when you use CompletableFuture)

Make it simple:

public static void loadInBackground(int taskId) {     // create the loading task     BackgroundTask backgroundTask = new BackgroundTask(taskId);     // No need to run in async, as it already in executor     backgroundTask.run(); } 

The ScheduledExecutorService will ensure only one task is run at a time when you invoked it with scheduleAtFixedRate

Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; that is executions will commence after initialDelay then initialDelay+period, then initialDelay + 2 * period, and so on. If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor. If any execution of this task takes longer than its period, then subsequent executions may start late, but will not concurrently execute.

like image 104
Mạnh Quyết Nguyễn Avatar answered Sep 27 '22 20:09

Mạnh Quyết Nguyễn


Taking the following as the requirements:

  • data should be loaded in background
  • after the loading the data should be displayed
  • while data are loaded no further requests should be accepted
  • if there were requests while the data were loaded another loading should be scheduled after a certain timeout (e. g. 5 seconds)

The solution ca be build based on the Executors.newSingleThreadExecutor(), CompletableFuture and LinkedBlockingQueue:

public class SingleThreadedLoader {    private static class BackgroundTask extends CompletableFuture<String> {      private final String query;      private BackgroundTask(String query) {       this.query = query;     }      public String getQuery() {       return query;     }   }    private final BlockingQueue<BackgroundTask> tasks = new LinkedBlockingQueue<>();   // while data are loaded no further requests should be accepted   private final Executor executor = Executors.newSingleThreadExecutor();    private final int delaySeconds;    private AtomicReference<Instant> lastExecution = new AtomicReference<>(Instant.EPOCH);    public SingleThreadedLoader(int delaySeconds) {     this.delaySeconds = delaySeconds;     setupLoading();   }    public BackgroundTask loadInBackground(String query) {     log("Enqueued query " + query);     BackgroundTask task = new BackgroundTask(query);     tasks.add(task);     return task;   }    private void setupLoading() {     // data should be loaded in background     executor.execute(() -> {       while (true) {         try {           // if there were requests while the data were loaded           // another loading should be scheduled after a certain timeout (e. g. 5 seconds)           Instant prev = lastExecution.get();           long delay = Duration.between(prev, Instant.now()).toSeconds();           if (delay < delaySeconds) {             log("Waiting for 5 seconds before next data loading");             TimeUnit.SECONDS.sleep(delaySeconds - delay);           }           BackgroundTask task = tasks.take();           try {             String query = task.getQuery();             String data = loadData(query);             task.complete(data);           } catch (Exception e) {             task.completeExceptionally(e);           }           lastExecution.set(Instant.now());         } catch (InterruptedException e) {           log(e.getMessage());           return;         }       }     });   }    private String loadData(String query) {     try {       log("Loading data for " + query);       TimeUnit.SECONDS.sleep(2);       log("Loaded data for " + query);       return "Result " + query;     } catch (InterruptedException e) {       throw new RuntimeException(e);     }   }    private static void log(String str) {     String time = LocalTime.now().truncatedTo(ChronoUnit.SECONDS).format(DateTimeFormatter.ISO_TIME);     String thread = Thread.currentThread().getName();     System.out.println(time + ' ' + thread + ": " + str);   }    public static void main(String[] args) throws Exception {     SingleThreadedLoader loader = new SingleThreadedLoader(5);     // after the loading the data should be displayed     loader.loadInBackground("1").thenAccept(SingleThreadedLoader::log);     loader.loadInBackground("2").thenAccept(SingleThreadedLoader::log);     loader.loadInBackground("3").thenAccept(SingleThreadedLoader::log);      log("Do another work in the main thread");      TimeUnit.SECONDS.sleep(30);   } } 

After the execution the stdout will have the following output:

10:29:26 main: Enqueued query 1 10:29:26 pool-1-thread-1: Loading data for 1 10:29:26 main: Enqueued query 2 10:29:26 main: Enqueued query 3 10:29:26 main: Do another work in the main thread 10:29:28 pool-1-thread-1: Loaded data for 1 10:29:28 pool-1-thread-1: Result 1 10:29:28 pool-1-thread-1: Waiting for 5 seconds before next data loading 10:29:33 pool-1-thread-1: Loading data for 2 10:29:36 pool-1-thread-1: Loaded data for 2 10:29:36 pool-1-thread-1: Result 2 10:29:36 pool-1-thread-1: Waiting for 5 seconds before next data loading 10:29:41 pool-1-thread-1: Loading data for 3 10:29:43 pool-1-thread-1: Loaded data for 3 10:29:43 pool-1-thread-1: Result 3 
like image 32
Evgeniy Khyst Avatar answered Sep 27 '22 20:09

Evgeniy Khyst