Question
How do you create a proper background loader in Java 8? The conditions:
The purpose is to e. g. have reload requests accepted, but not the database flooded with the requests.
MCVE
Here's a MCVE. It consists of a background task which simulates the loading by simply invoking Thread.sleep for 2 seconds. The task is scheduled every second which naturally leads to an overlap of the background loading tasks, which should be avoided.
public class LoadInBackgroundExample { /** * A simple background task which should perform the data loading operation. In this minimal example it simply invokes Thread.sleep */ public static class BackgroundTask implements Runnable { private int id; public BackgroundTask(int id) { this.id = id; } /** * Sleep for a given amount of time to simulate loading. */ @Override public void run() { try { System.out.println("Start #" + id + ": " + Thread.currentThread()); long sleepTime = 2000; Thread.sleep( sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("Finish #" + id + ": " + Thread.currentThread()); } } } /** * CompletableFuture which simulates loading and showing data. * @param taskId Identifier of the current task */ public static void loadInBackground( int taskId) { // create the loading task BackgroundTask backgroundTask = new BackgroundTask( taskId); // "load" the data asynchronously CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { CompletableFuture<Void> future = CompletableFuture.runAsync(backgroundTask); try { future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return "task " + backgroundTask.id; } }); // display the data after they are loaded CompletableFuture<Void> future = completableFuture.thenAccept(x -> { System.out.println( "Background task finished:" + x); }); } public static void main(String[] args) { // runnable which invokes the background loader every second Runnable trigger = new Runnable() { int taskId = 0; public void run() { loadInBackground( taskId++); } }; // create scheduler ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(trigger, 0, 1, TimeUnit.SECONDS); // cancel the scheudler and the application after 10 seconds scheduler.schedule(() -> beeperHandle.cancel(true), 10, TimeUnit.SECONDS); try { beeperHandle.get(); } catch (Throwable th) { } System.out.println( "Cancelled"); System.exit(0); } }
The output is this:
Start #0: Thread[ForkJoinPool.commonPool-worker-2,5,main] Start #1: Thread[ForkJoinPool.commonPool-worker-4,5,main] Start #2: Thread[ForkJoinPool.commonPool-worker-6,5,main] Finish #0: Thread[ForkJoinPool.commonPool-worker-2,5,main] Background task finished:task 0 Finish #1: Thread[ForkJoinPool.commonPool-worker-4,5,main] Background task finished:task 1 Start #3: Thread[ForkJoinPool.commonPool-worker-4,5,main] Finish #2: Thread[ForkJoinPool.commonPool-worker-6,5,main] Background task finished:task 2 Start #4: Thread[ForkJoinPool.commonPool-worker-6,5,main] Start #5: Thread[ForkJoinPool.commonPool-worker-2,5,main] Finish #3: Thread[ForkJoinPool.commonPool-worker-4,5,main] Background task finished:task 3 Start #6: Thread[ForkJoinPool.commonPool-worker-4,5,main] Finish #4: Thread[ForkJoinPool.commonPool-worker-6,5,main] Background task finished:task 4 Finish #5: Thread[ForkJoinPool.commonPool-worker-2,5,main] Background task finished:task 5 Start #7: Thread[ForkJoinPool.commonPool-worker-2,5,main] Finish #6: Thread[ForkJoinPool.commonPool-worker-4,5,main] Start #8: Thread[ForkJoinPool.commonPool-worker-6,5,main] Background task finished:task 6 Start #9: Thread[ForkJoinPool.commonPool-worker-4,5,main] Finish #7: Thread[ForkJoinPool.commonPool-worker-2,5,main] Background task finished:task 7 Start #10: Thread[ForkJoinPool.commonPool-worker-2,5,main] Finish #8: Thread[ForkJoinPool.commonPool-worker-6,5,main] Background task finished:task 8 Cancelled
The goal is to have e. g. #1 and #2 skipped because #0 is still running.
Problem
Where do you properly set the blocking mechanism? Should synchronization be used? Or some AtomicBoolean
? And if so, should it be inside the get()
method or elsewhere?
You already have a threadpool to execute the task. It's not necessarily and make thing complicated to run the task in another async executor (ForkJoinPool
when you use CompletableFuture
)
Make it simple:
public static void loadInBackground(int taskId) { // create the loading task BackgroundTask backgroundTask = new BackgroundTask(taskId); // No need to run in async, as it already in executor backgroundTask.run(); }
The ScheduledExecutorService will ensure only one task is run at a time when you invoked it with scheduleAtFixedRate
Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given period; that is executions will commence after initialDelay then initialDelay+period, then initialDelay + 2 * period, and so on. If any execution of the task encounters an exception, subsequent executions are suppressed. Otherwise, the task will only terminate via cancellation or termination of the executor. If any execution of this task takes longer than its period, then subsequent executions may start late, but will not concurrently execute.
Taking the following as the requirements:
- data should be loaded in background
- after the loading the data should be displayed
- while data are loaded no further requests should be accepted
- if there were requests while the data were loaded another loading should be scheduled after a certain timeout (e. g. 5 seconds)
The solution ca be build based on the Executors.newSingleThreadExecutor()
, CompletableFuture
and LinkedBlockingQueue
:
public class SingleThreadedLoader { private static class BackgroundTask extends CompletableFuture<String> { private final String query; private BackgroundTask(String query) { this.query = query; } public String getQuery() { return query; } } private final BlockingQueue<BackgroundTask> tasks = new LinkedBlockingQueue<>(); // while data are loaded no further requests should be accepted private final Executor executor = Executors.newSingleThreadExecutor(); private final int delaySeconds; private AtomicReference<Instant> lastExecution = new AtomicReference<>(Instant.EPOCH); public SingleThreadedLoader(int delaySeconds) { this.delaySeconds = delaySeconds; setupLoading(); } public BackgroundTask loadInBackground(String query) { log("Enqueued query " + query); BackgroundTask task = new BackgroundTask(query); tasks.add(task); return task; } private void setupLoading() { // data should be loaded in background executor.execute(() -> { while (true) { try { // if there were requests while the data were loaded // another loading should be scheduled after a certain timeout (e. g. 5 seconds) Instant prev = lastExecution.get(); long delay = Duration.between(prev, Instant.now()).toSeconds(); if (delay < delaySeconds) { log("Waiting for 5 seconds before next data loading"); TimeUnit.SECONDS.sleep(delaySeconds - delay); } BackgroundTask task = tasks.take(); try { String query = task.getQuery(); String data = loadData(query); task.complete(data); } catch (Exception e) { task.completeExceptionally(e); } lastExecution.set(Instant.now()); } catch (InterruptedException e) { log(e.getMessage()); return; } } }); } private String loadData(String query) { try { log("Loading data for " + query); TimeUnit.SECONDS.sleep(2); log("Loaded data for " + query); return "Result " + query; } catch (InterruptedException e) { throw new RuntimeException(e); } } private static void log(String str) { String time = LocalTime.now().truncatedTo(ChronoUnit.SECONDS).format(DateTimeFormatter.ISO_TIME); String thread = Thread.currentThread().getName(); System.out.println(time + ' ' + thread + ": " + str); } public static void main(String[] args) throws Exception { SingleThreadedLoader loader = new SingleThreadedLoader(5); // after the loading the data should be displayed loader.loadInBackground("1").thenAccept(SingleThreadedLoader::log); loader.loadInBackground("2").thenAccept(SingleThreadedLoader::log); loader.loadInBackground("3").thenAccept(SingleThreadedLoader::log); log("Do another work in the main thread"); TimeUnit.SECONDS.sleep(30); } }
After the execution the stdout will have the following output:
10:29:26 main: Enqueued query 1 10:29:26 pool-1-thread-1: Loading data for 1 10:29:26 main: Enqueued query 2 10:29:26 main: Enqueued query 3 10:29:26 main: Do another work in the main thread 10:29:28 pool-1-thread-1: Loaded data for 1 10:29:28 pool-1-thread-1: Result 1 10:29:28 pool-1-thread-1: Waiting for 5 seconds before next data loading 10:29:33 pool-1-thread-1: Loading data for 2 10:29:36 pool-1-thread-1: Loaded data for 2 10:29:36 pool-1-thread-1: Result 2 10:29:36 pool-1-thread-1: Waiting for 5 seconds before next data loading 10:29:41 pool-1-thread-1: Loading data for 3 10:29:43 pool-1-thread-1: Loaded data for 3 10:29:43 pool-1-thread-1: Result 3
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With