Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Properly doing multithreading and thread pools with JavaFX Tasks

I have an option for users to submit multiple files from a FileChooser to be processed by some code. The result will be IO for reading the file, then the actual heavy computation on the stored data. The user is allowed to select multiple files, and since file processing does not depend on any of the other files selected, that makes my life a lot easier to handle this with threads.

In addition, the user needs to have a list of buttons, one for each Task to cancel, and a "Cancel All" button. Therefore I have to consider the ability to selectively or collectively kill one or all of the Tasks.

The last requirement is that I don't let the user choke the system by opening a ton of files. Therefore I figure a thread pool with a limited number of threads (let's pretend I'll cap it at 4 for some arbitrary number).

I am unsure how to properly go about setting this all up. I have the logic of what I need to do but using the right classes is where I am stuck.

I've checked this resource already, so if the answer is somehow in there then I've misread the article.

  • Are there any JavaFX classes that can help me with this situation?

  • If not, how would I mix a Task with some kind of thread pool? Do I have to make my own thread pool or is there one that is already provided for me?

  • Am I to make a singleton somewhere that contains the max number of threads I am willing to allow the user?

I would prefer to use one already in Java library since I am not a multithreading expert, and am worried that I could potentially do it wrong. Since thread bugs appear to be the most evil thing on the planet to debug, I'm trying very hard to make sure I do this as correctly as possible.

If there are no ways to do this and I have to roll my own implementation, what is the best way to go about doing this?

EDIT: I should note that I am generally new to threads, I have used them before and I'm reading books on them, but this will be my first major use of them and I'd really like to do it properly.

like image 231
Water Avatar asked Apr 19 '15 17:04

Water


1 Answers

JavaFX has a javafx.concurrent API; in particular, the Task class fits your use case very nicely. This API is designed to work in conjunction with the java.util.concurrent API. For example, Task is an implementation of FutureTask, so it can be submitted to an Executor. As you want to use a thread pool, you can create an Executor that implements a thread pool for you, and submit your tasks to it:

final int MAX_THREADS = 4 ;

Executor exec = Executors.newFixedThreadPool(MAX_THREADS);

As these threads are running in the background of a UI application, you probably don't want them to prevent application exit. You can achieve this by making the threads created by your executor daemon threads:

Executor exec = Executors.newFixedThreadPool(MAX_THREADS, runnable -> {
    Thread t = new Thread(runnable);
    t.setDaemon(true);
    return t ;
});

The resulting executor will have a pool of up to MAX_THREADS threads. If tasks are submitted when no threads are available, they will wait in a queue until a thread becomes available.

To implement the actual Task, there are a few things to bear in mind:

You must not update the UI from a background thread. Since your Task is submitted to the executor above, it's call() method will be invoked on a background thread. If you really need to change the UI during the execution of the call method, you can wrap the code that changes the UI in Platform.runLater(...), but it is better to structure things so that you avoid this situation. In particular, the Task has a set of updateXXX(...) methods that change the values of corresponding Task properties on the FX Application thread. Your UI elements can bind to these properties as needed.

It is advisable for the call method not to access any shared data (other than via the updateXXX(...) methods mentioned above). Instantiate your Task subclass setting only final variables, have the call() method compute a value, and return the value.

For canceling the Task, the Task class defines a built-in cancel() method. If you have a long-running call() method, you should periodically check the value of isCancelled() and stop doing work if it returns true.

Here's a basic example:

import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import javafx.application.Application;
import javafx.beans.property.ReadOnlyObjectWrapper;
import javafx.beans.value.ChangeListener;
import javafx.concurrent.Task;
import javafx.concurrent.Worker;
import javafx.geometry.Insets;
import javafx.geometry.Pos;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.TableCell;
import javafx.scene.control.TableColumn;
import javafx.scene.control.TableView;
import javafx.scene.control.cell.ProgressBarTableCell;
import javafx.scene.layout.BorderPane;
import javafx.scene.layout.HBox;
import javafx.stage.FileChooser;
import javafx.stage.Stage;

public class FileTaskExample extends Application {

    private static final Random RNG = new Random();

    private static final int MAX_THREADS = 4 ;

    private final Executor exec = Executors.newFixedThreadPool(MAX_THREADS, runnable -> {
        Thread t = new Thread(runnable);
        t.setDaemon(true);
        return t ;
    });

    @Override
    public void start(Stage primaryStage) {

        // table to display all tasks:
        TableView<FileProcessingTask> table = new TableView<>();

        TableColumn<FileProcessingTask, File> fileColumn = new TableColumn<>("File");
        fileColumn.setCellValueFactory(cellData -> new ReadOnlyObjectWrapper<File>(cellData.getValue().getFile()));
        fileColumn.setCellFactory(col -> new TableCell<FileProcessingTask, File>() {
            @Override
            public void updateItem(File file, boolean empty) {
                super.updateItem(file, empty);
                if (empty) {
                    setText(null);
                } else {
                    setText(file.getName());
                }
            }
        });
        fileColumn.setPrefWidth(200);

        TableColumn<FileProcessingTask, Worker.State> statusColumn = new TableColumn<>("Status");
        statusColumn.setCellValueFactory(cellData -> cellData.getValue().stateProperty());
        statusColumn.setPrefWidth(100);

        TableColumn<FileProcessingTask, Double> progressColumn = new TableColumn<>("Progress");
        progressColumn.setCellValueFactory(cellData -> cellData.getValue().progressProperty().asObject());
        progressColumn.setCellFactory(ProgressBarTableCell.forTableColumn());
        progressColumn.setPrefWidth(100);

        TableColumn<FileProcessingTask, Long> resultColumn = new TableColumn<>("Result");
        resultColumn.setCellValueFactory(cellData -> cellData.getValue().valueProperty());
        resultColumn.setPrefWidth(100);

        TableColumn<FileProcessingTask, FileProcessingTask> cancelColumn = new TableColumn<>("Cancel");
        cancelColumn.setCellValueFactory(cellData -> new ReadOnlyObjectWrapper<FileProcessingTask>(cellData.getValue()));
        cancelColumn.setCellFactory(col -> {
            TableCell<FileProcessingTask, FileProcessingTask> cell = new TableCell<>();
            Button cancelButton = new Button("Cancel");
            cancelButton.setOnAction(e -> cell.getItem().cancel());

            // listener for disabling button if task is not running:
            ChangeListener<Boolean> disableListener = (obs, wasRunning, isNowRunning) -> 
                cancelButton.setDisable(! isNowRunning);

            cell.itemProperty().addListener((obs, oldTask, newTask) -> {
                if (oldTask != null) {
                    oldTask.runningProperty().removeListener(disableListener);
                }
                if (newTask == null) {
                    cell.setGraphic(null);
                } else {
                    cell.setGraphic(cancelButton);
                    cancelButton.setDisable(! newTask.isRunning());
                    newTask.runningProperty().addListener(disableListener);
                }
            });

            return cell ;
        });
        cancelColumn.setPrefWidth(100);

        table.getColumns().addAll(Arrays.asList(fileColumn, statusColumn, progressColumn, resultColumn, cancelColumn));

        Button cancelAllButton = new Button("Cancel All");
        cancelAllButton.setOnAction(e -> 
            table.getItems().stream().filter(Task::isRunning).forEach(Task::cancel));

        Button newTasksButton = new Button("Process files");
        FileChooser chooser = new FileChooser();
        newTasksButton.setOnAction(e -> {
            List<File> files = chooser.showOpenMultipleDialog(primaryStage);
            if (files != null) {
                files.stream().map(FileProcessingTask::new).peek(exec::execute).forEach(table.getItems()::add);
            }
        });

        HBox controls = new HBox(5, newTasksButton, cancelAllButton);
        controls.setAlignment(Pos.CENTER);
        controls.setPadding(new Insets(10));

        BorderPane root = new BorderPane(table, null, null, controls, null);

        Scene scene = new Scene(root, 800, 600);
        primaryStage.setScene(scene);
        primaryStage.show();
    }

    public static class FileProcessingTask extends Task<Long> {

        private final File file ;

        public FileProcessingTask(File file) {
            this.file = file ;
        }

        public File getFile() {
            return file ;
        }

        @Override
        public Long call() throws Exception {

            // just to show you can return the result of the computation:
            long fileLength = file.length();

            // dummy processing, in real life read file and do something with it:
            int delay = RNG.nextInt(50) + 50 ;
            for (int i = 0 ; i < 100; i++) {
                Thread.sleep(delay);
                updateProgress(i, 100);

                // check for cancellation and bail if cancelled:
                if (isCancelled()) {
                    updateProgress(0, 100);
                    break ;
                }
            }

            return fileLength ;
        }
    }

    public static void main(String[] args) {
        launch(args);
    }
}
like image 113
James_D Avatar answered Sep 19 '22 13:09

James_D