Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java concurrency pattern for external shared resource (smartcards)

I have a web server service where clients request a smartcard computation and get their result. Available smartcard number can decrease or increase during the server uptime, for example I can add or remove physically the smartcard from the reader (or many other events... like exception and so on).

enter image description here

A smartcard computation can take a while, so I have to optimize these jobs to use all available smartcards if there are concurrent requests to the web server.

I thought to work with a smartcard-thread pool. The unusual thing, at least for me, is that the pool should change its size not depending on the client requests but only on the smartcard availability.

enter image description here

I studied many examples of:

  • BlockingQueue: It looks good to store request and stop thread waiting for something to do.
  • FutureTask: I can use this class to let client waits its answer, but which kind of excecutor should do the task?
  • ThreadPoolExecutor: Seems what I need, but with this I cannot change the pool size, moreover every thread should be linked to a single smartcard slot. This can be a solution if I could change the pool size (adding a thread when a smartcard is inserted and removing a thread when a smartcard is removed) and if I can assign a specific smartcard to each thread.

This is the smartcard control, I have one SmartcardWrapper per smartcard, every smartcard has its own slot number.

public class SmartcardWrapper{

    private int slot;

    public SmartcardWrapper(int slot) {
        this.slot=slot;
    }   

    public byte[] compute(byte[] input) {
        byte[] out=new byte[];
        SmartcardApi.computerInput(slot,input,out); //Native method
        return out;
    }
}

I tried to create a thread pool with one thread per smartcard:

private class SmartcardThread extends Thread{

    protected SmartcardWrapper sw;

    public SmartcardThread(SmartcardWrapper sw){
        this.sw=sw;
    }

    @Override
    public void run() {
        while(true){
            byte[] input=queue.take();
            byte output=sw.compute(input);
            // I have to return back the output to the client
        }           
    }
}

Everyone waiting for something in the same input queue:

BlockingQueue<byte[]> queue=new BlockingQueue<byte[]>();

But how to return back output from smartcard-thread to the webserver-client? This let me think that BlockingQueue is not my solution.

How to approach this problem? Which concurrency pattern should I follow? is it correct to assign one thread per smartcard or should I can simply use semaphores?

like image 454
Tobia Avatar asked Dec 07 '15 16:12

Tobia


3 Answers

Your assumption:

ThreadPoolExecutor: Seems what I need, but with this I cannot change the pool size, moreover every thread should be linked to a single smartcard slot.

is not right.

You can set thread pool size dynamically.

Have a look at below ThreadPoolExecutor APIs

public void setMaximumPoolSize(int maximumPoolSize)

Sets the maximum allowed number of threads. This overrides any value set in the constructor. If the new value is smaller than the current value, excess existing threads will be terminated when they next become idle.

public void setCorePoolSize(int corePoolSize)

Sets the core number of threads. This overrides any value set in the constructor. If the new value is smaller than the current value, excess existing threads will be terminated when they next become idle. If larger, new threads will, if needed, be started to execute any queued tasks.

Core and maximum pool sizes:

A ThreadPoolExecutor will automatically adjust the pool size according to the bounds set by corePoolSize and maximumPoolSize.

When a new task is submitted in method execute(java.lang.Runnable), and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle.

If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full.

By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. But I would not recommend to have those many number of threads. Set this value with caution.

Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using setCorePoolSize(int) and setMaximumPoolSize(int).

EDIT:

For better utilization of thread pool, if you know the maximum number of cards are 6, you can use

 ExecutorService executor = Executors.newFixedThreadPool(6);

OR

like image 165
Ravindra babu Avatar answered Nov 14 '22 00:11

Ravindra babu


Have you considered using Apache Commons Pool at all?

You need to maintain a pool of SmartcardWrapper objects where each SmartcardWrapper will represent a physical SmartCard. Whenever you need to make a new computation, you borrow the object from the pool, do the calculation and return the object in the pool so it can be reused by the next thread.

The pool itself is thread-safe and blocks when there are no available objects. All you need to do is implement an api to add/remove SmartcardWrapper Objects to the pool.

like image 5
Vladimir G. Avatar answered Nov 14 '22 00:11

Vladimir G.


I might have found a reasonable simple solution based on the following assumptions:

  • a separate process manages (system-event) notifications for smartcards that become available or are removed.
  • a client does not care which smartcard it gets to use, as long as it can use one without interference.

These two assumptions actually make it easier to create a pooling (shared resources) solution, since it is usually the pool itself that is responsible for creating and removing resources when appropriate. Without this functionality, a pooling solution becomes simpler. I do assume that the client that gets a smartcard from the pool to use, can execute the required smartcard functions within its own execution thread (similar to how a database connection is used from a database connection pool to query data from a database).

I have only done some minimal testing for the two classes shown below, and I'm afraid the bulk of the work is in writing (unit) tests that prove the pool works properly with concurrent client requests combined with adding and removing smartcard resources. If you do not want to do that, then the answer from user769771 is probably a better solution. But if you do, try it out, see if it fits. The idea is that only one resource-pool instance is created and used by all the clients and updated by the separate process that manages smartcard availability.

import java.util.*;
import java.util.concurrent.*;

/**
 * A resource pool that expects shared resources 
 * to be added and removed from the pool by an external process
 * (i.e. not done by the pool itself, see {@link #add(Object)} and {@link #remove(Object)}.
 * <br>A {@link ResourcePoolValidator} can optionally be used. 
 * @param <T> resource type handed out by the pool.
 */
public class ResourcePool<T> {

    private final Set<T> registered = Collections.newSetFromMap(new ConcurrentHashMap<T, Boolean>()); 
    /* Use a linked list as FIFO queue for resources to lease. */
    private final List<T> available = Collections.synchronizedList(new LinkedList<T>()); 
    private final Semaphore availableLock = new Semaphore(0, true); 

    private final ResourcePoolValidator<T> validator;

    public ResourcePool() {
        this(null);
    }

    public ResourcePool(ResourcePoolValidator<T> validator) {
        super();
        this.validator = validator;
    }

    /**
     * Add a resource to the pool.
     * @return true if resource is not already in the pool.
     */
    public synchronized boolean add(T resource) {

        boolean added = false;
        if (!registered.contains(resource)) {
            registered.add(resource);
            available.add(resource);
            availableLock.release();
            added = true;
        }
        return added;
    }

    /**
     * Removes a resource from the pool.
     * The resource might be in use (see {@link #isLeased(Object)})
     * in which case {@link ResourcePoolValidator#abandoned(Object)} will be called 
     * when the resource is no longer used (i.e. released). 
     * @return true if resource was part of the pool and removed from the pool.
     */
    public synchronized boolean remove(T resource) {

        // method is synchronized to prevent multiple threads calling add and remove at the same time 
        // which could in turn bring the pool in an invalid state.
        return registered.remove(resource);
    }

    /**
     * If the given resource is (or was, see also {@link #remove(Object)} part of the pool,
     * a returned value true indicates the resource is in use / checked out.
     * <br>This is a relative expensive method, do not call it frequently.
     */
    public boolean isLeased(T resource) {
        return !available.contains(resource);
    }

    /**
     * Try to get a shared resource for usage. 
     * If a resource is acquired, it must be {@link #release(Object)}d in a finally-block.
     * @return A resource that can be exclusively used by the caller.
     * @throws InterruptedException When acquiring a resource is interrupted.
     * @throws TimeoutException When a resource is not available within the given timeout period.
     */
    public T tryAcquire(long timeout, TimeUnit tunit) throws InterruptedException, TimeoutException {

        T resource = null;
        long timeRemaining = tunit.toMillis(timeout);
        final long tend = System.currentTimeMillis() + timeRemaining;
        do {
            if (availableLock.tryAcquire(timeRemaining, TimeUnit.MILLISECONDS)) {
                resource = available.remove(0);
                if (registered.contains(resource)) {
                    boolean valid = false;
                    try {
                        valid = (validator == null ? true : validator.isValid(resource));
                    } catch (Exception e) {
                        // TODO: log exception
                        e.printStackTrace();
                    }
                    if (valid) {
                        break; // return the "checked out" resource
                    } else {
                        // remove invalid resource from pool
                        registered.remove(resource);
                        if (validator != null) {
                            validator.abandoned(resource);
                        }
                    }
                }
                // resource was removed from pool, try acquire again
                // note that this implicitly lowers the maximum available resources
                // (an acquired permit from availableLock goes unused).
                // TODO: retry puts us at the back of availableLock queue but should put us at the front of the queue
                resource = null;
            }
            timeRemaining = tend - System.currentTimeMillis();
        } while (timeRemaining > 0L);
        if (resource == null) {
            throw new TimeoutException("Unable to acquire a resource within " + tunit.toMillis(timeout) + " ms.");
        }
        return resource;
    }

    /**
     * This method must be called by the caller / client whenever {@link #tryAcquire(long, TimeUnit)}
     * has returned a resource. If the caller has determined the resource is no longer valid,
     * the caller should call {@link #remove(Object)} before calling this method.
     * @param resource no longer used.
     */
    public void release(T resource) {

        if (resource == null) {
            return;
        }
        if (registered.contains(resource)) {
            available.add(resource);
            availableLock.release();
        } else {
            if (validator != null) {
                validator.abandoned(resource);
            }
        }
    }

    /** An array (copy) of all resources registered in the pool. */
    @SuppressWarnings("unchecked")
    public T[] getRegisteredResources() {
        return (T[]) registered.toArray(new Object[registered.size()]);
    }

}

And a separate class with functions related to the separate process that manages smarcard availability.

import java.util.concurrent.TimeUnit;

/**
 * Used by a {@link ResourcePool} to validate a resource before handing it out for lease
 * (see {@link #isValid(Object)} and signal a resource is no longer used (see {@link #abandoned(Object)}). 
 */
public class ResourcePoolValidator<T> {

    /**
     * Overload this method (this method does nothing by default) 
     * to validate a resource before handing it out for lease.
     * If this method returns false or throws an exception (which it preferably should not do), 
     * the resource is removed from the pool.
     * @return true if the resource is valid for leasing
     */
    public boolean isValid(T resource) {
        return true;
    }

    /**
     * Called by the {@link ResourcePool#release(Object)} method when a resource is released by a caller 
     * but the resource was previously removed from the pool and in use.
     * <br>Called by {@link ResourcePool#tryAcquire(long, TimeUnit)} if a resource if not valid 
     * (see {@link #isValid(Object)}.
     * <br>Overload this method (this method does nothing by default) to create a notification of an unused resource,
     * do NOT do any long period of processing as this method is called from a caller (client) thread.
     */
    public void abandoned(T resource) {
        // NO-OP
    }

}
like image 3
vanOekel Avatar answered Nov 14 '22 00:11

vanOekel