Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

JavaFX version of ExecutorService

I made a Java utility that searches inside XML inside x number of fancy zip files (twx).

Originally this was a command line utility and did no Threading.

When I moved it to use JavaFX, I ran into freeze issues and then moved all the searches into a Task object to fix.

I needed some way to track progress, so I implemented the Progress Property and the ProgressBar to track.

It worked great, but since I'm already multithreading, why not create a thread for each Zip Search. That unfortunately had not worked so well.

To keep track I create an Array of Tasks and then a Main Task that handles them. I use a progress and total properties that handles all the updates.

Here is the code

public class TextSearch {
    final private SimpleDoubleProperty progress = new SimpleDoubleProperty();
    final private SimpleDoubleProperty total = new SimpleDoubleProperty();

/**
 * 
 * Kicks off a search. Creates a Task/Thread for each twx search.
 * @return A task object that maintains all of the twx searches.
 * 
 * @throws ZipException If the zip file is unreadable.
 * @throws IOException If the file is unreadable.
 * @throws JDOMException If the xml in the files are unreadable.
 * @throws InvalidTWXFile If the twx is corrupt.
 */
public Task<?> executeSearch() throws ZipException, IOException, JDOMException, InvalidTWXFile {
    //Loop through all registered twx files.
    Iterator<TWExport> rit = registered.iterator();
    Integer t = 0;
    //Create a task for each search
    final ArrayList<Task<?>> tasks = new ArrayList<Task<?>>();
    while(rit.hasNext())
    {
        final TWExport twx = rit.next();
        //Only run search if user selects to search it.
        if(twx.getSearchEnabled())
        {
            Task<Void> task = new Task<Void>() {
                @Override public Void call() {
                    informUser("Searching " + twx);
                    SearchResults proj = new SearchResults(twx);
                    searchResult.add(proj);
                    searchTwx(proj,twx);
                    twx.setResultCount(proj.getTotalCount());
                    informUser("Finished Searching " + twx);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        if (isCancelled()) {
                            updateMessage("Cancelled");
                        }
                    }
                    return null;
                }
            };
            tasks.add(task);
            t += twx.getObjects().size();
        } else
        {
            informUser("Skipping " + twx);
        }
    }
    total.setValue(t);


    //Create the main thread that will hold all individual searches.
    Task<Void> main = new Task<Void>() {
        @Override
        protected Void call() throws Exception {
            startTime = new Date();
            Iterator<Task<?>> it = tasks.iterator();
            while(it.hasNext())
            {
                Thread t = new Thread(it.next());
                t.start();
            }
            //Sometimes I get a hung thread in this loop
            while(!progress.getValue().equals(total.getValue()))
            {
                updateProgress(progress.getValue(), total.getValue());
                Thread.sleep(5000);
            }
            setEndTime(new Date());
            return null;
        }
    };

    new Thread(main).start();
    return main;
}

/**
 * Search through a twx file and add results to the project search results
 * @param proj  The parent SearchResults
 * @param twx   The TWExport to search
 */
private void searchTwx(SearchResults proj, TWExport twx) {
    Iterator<TWObject> it = twx.getObjects().iterator();
    //Iterate through the files and get the result
    while(it.hasNext())
    {
        TWObject object = it.next();
        progress.setValue(progress.getValue() + 1);
        if(searchArtifacts.matcher(object.getName()).find())
        {
            SearchResults result = object.searchContents(searchStr);
            if(result != null)
            {
                proj.add(result);
            }
        }
    }
}

Using a main thread seems very clunky and sometimes it gets hung in that loop when searching 10+ zip files at a time.

Is there a better way to do this, with an unknown number of Tasks? Is there anything like a JavaFX ExecutorService where I can add a bunch of Tasks, kick it off and monitor the progressProperty?

like image 800
milesacul Avatar asked Oct 28 '15 14:10

milesacul


1 Answers

There's no need for a JavaFX-specific executor service: the regular java.util.concurrent.ExecutorService works just fine, as Task is a subclass of FutureTask.

Once you have a list of tasks, you can compute the overall progress as a function of the progress of each of them. For example, it might just be the sum of the progress of all, divided by the number of tasks. If each task has a different number of items to process, you might be able to do something more sophisticated.

Here's a simple SSCCE:

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import javafx.application.Application;
import javafx.beans.binding.DoubleBinding;
import javafx.beans.property.DoubleProperty;
import javafx.beans.property.IntegerProperty;
import javafx.beans.property.SimpleDoubleProperty;
import javafx.beans.property.SimpleIntegerProperty;
import javafx.concurrent.Task;
import javafx.concurrent.Worker;
import javafx.geometry.Insets;
import javafx.geometry.Pos;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.Label;
import javafx.scene.control.ProgressBar;
import javafx.scene.control.TextArea;
import javafx.scene.layout.BorderPane;
import javafx.scene.layout.Priority;
import javafx.scene.layout.VBox;
import javafx.stage.Stage;

public class MultipleTaskTest extends Application {

    private final ExecutorService exec = Executors.newFixedThreadPool(5, r -> {
        Thread t = new Thread(r);
        t.setDaemon(true);
        return t ;
    });

    private final Random rng = new Random();


    @Override
    public void start(Stage primaryStage) {

        Label pendingTasksLabel = new Label();
        Button button = new Button("Launch tasks");
        TextArea log = new TextArea();

        DoubleProperty progress = new SimpleDoubleProperty(1);

        ProgressBar progressBar = new ProgressBar();
        progressBar.progressProperty().bind(progress);

        IntegerProperty pendingTasks = new SimpleIntegerProperty(0);
        pendingTasksLabel.textProperty().bind(pendingTasks.asString("Pending Tasks: %d"));

        button.disableProperty().bind(pendingTasks.greaterThan(0));

        button.setOnAction(e -> {
            int numTasks = rng.nextInt(5) + 4 ;

            List<Task<Void>> tasks = new ArrayList<>();
            for (int i = 0; i < numTasks; i++) {
                tasks.add(createRandomTask());
            }

            // rebind progress:
            progress.unbind();
            progress.bind( new DoubleBinding() {
                {
                    for (Task<Void> task : tasks) {
                        bind(task.progressProperty());
                    }
                }

                @Override
                public double computeValue() {
                    return tasks.stream().collect(Collectors.summingDouble(
                        task -> Math.max(task.getProgress(), 0)    
                    )) / numTasks;
                }
            });

            log.appendText("Submitting "+numTasks+" tasks\n");

            pendingTasks.set(numTasks);

            // log state of each task:
            tasks.forEach(task -> 
                task.stateProperty().addListener((obs, oldState, newState) -> {
                    log.appendText("\tTask "+newState+"\n");

                    // update pendingTasks if task moves out of running state:                  
                    if (oldState == Worker.State.RUNNING) {
                        pendingTasks.set(pendingTasks.get() - 1);
                    }
                }));

            tasks.forEach(exec::execute);
        });

        VBox root = new VBox(10, pendingTasksLabel, progressBar, log, button);
        root.setAlignment(Pos.CENTER);
        root.setPadding(new Insets(10));
        VBox.setVgrow(log, Priority.ALWAYS);

        primaryStage.setScene(new Scene(root, 400, 400));
        primaryStage.show();
    }

    @Override
    public void stop() {
        exec.shutdownNow() ;
    }

    private Task<Void> createRandomTask() {
        int numSteps = 100 + rng.nextInt(100);
        return new Task<Void>() {
            @Override
            public Void call() throws Exception {
                for (int i = 1; i <= numSteps; i++) {
                    Thread.sleep(50);
                    updateProgress(i, numSteps);
                }
                return null ;
            }
        };
    }

    public static void main(String[] args) {
        launch(args);
    }
}
like image 167
James_D Avatar answered Oct 02 '22 23:10

James_D