I need to build a pool of workers in Java where each worker has its own connected socket; when the worker thread runs, it uses the socket but keeps it open to reuse later. We decided on this approach because the overhead associated with creating, connecting, and destroying sockets on an ad-hoc basis required too much overhead, so we need a method by which a pool of workers are pre-initializaed with their socket connection, ready to take on work while keeping the socket resources safe from other threads (sockets are not thread safe), so we need something along these lines...
public class SocketTask implements Runnable {
Socket socket;
public SocketTask(){
//create + connect socket here
}
public void run(){
//use socket here
}
}
On application startup, we want to initialize the workers and, hopefully, the socket connections somehow too...
MyWorkerPool pool = new MyWorkerPool();
for( int i = 0; i < 100; i++)
pool.addWorker( new WorkerThread());
As work is requested by the application, we send tasks to the worker pool for immediate execution...
pool.queueWork( new SocketTask(..));
Updated with Working Code
Based on helpful comments from Gray and jontejj, I've got the following code working...
SocketTask
public class SocketTask implements Runnable {
private String workDetails;
private static final ThreadLocal<Socket> threadLocal =
new ThreadLocal<Socket>(){
@Override
protected Socket initialValue(){
return new Socket();
}
};
public SocketTask(String details){
this.workDetails = details;
}
public void run(){
Socket s = getSocket(); //gets from threadlocal
//send data on socket based on workDetails, etc.
}
public static Socket getSocket(){
return threadLocal.get();
}
}
ExecutorService
ExecutorService threadPool =
Executors.newFixedThreadPool(5, Executors.defaultThreadFactory());
int tasks = 15;
for( int i = 1; i <= tasks; i++){
threadPool.execute(new SocketTask("foobar-" + i));
}
I like this approach for several reasons...
One idea would be to put the Socket
s in a BlockingQueue
. Then whenever you need a Socket
your threads can take()
from the queue and when they are done with the Socket
they put()
it back on the queue.
public void run() {
Socket socket = socketQueue.take();
try {
// use the socket ...
} finally {
socketQueue.put(socket);
}
}
This has the added benefits:
ExecutorService
code.ExecutorService
completes, you can shutdown your sockets by just dequeueing them and closing them.This does add the additional overhead of another BlockingQueue
but if you are doing Socket
communications, you won't notice it.
we don't believe ThreadFactory addresses our needs ...
I think you could make this work if you used thread-locals. Your thread factory would create a thread that first opens the socket, stores it in a thread-local, then calls the Runnable
arg which does all of the work with the socket, dequeuing jobs from the ExecutorService
internal queue. Once it is done the arg.run()
method would finish and you could get the socket from the thread-local and close it.
Something like the following. It's a bit messy but you should get the idea.
ExecutorService threadPool =
Executors.newFixedThreadPool(10,
new ThreadFactory() {
public Thread newThread(final Runnable r) {
Thread thread = new Thread(new Runnable() {
public void run() {
openSocketAndStoreInThreadLocal();
// our tasks would then get the socket from the thread-local
r.run();
getSocketFromThreadLocalAndCloseIt();
}
});
return thread;
}
}));
So your tasks would implement Runnable
and look like:
public SocketWorker implements Runnable {
private final ThreadLocal<Socket> threadLocal;
public SocketWorker(ThreadLocal<Socket> threadLocal) {
this.threadLocal = threadLocal;
}
public void run() {
Socket socket = threadLocal.get();
// use the socket ...
}
}
I think you should use a ThreadLocal
package com.stackoverflow.q16680096;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main
{
public static void main(String[] args)
{
ExecutorService pool = Executors.newCachedThreadPool();
int nrOfConcurrentUsers = 100;
for(int i = 0; i < nrOfConcurrentUsers; i++)
{
pool.submit(new InitSocketTask());
}
// do stuff...
pool.submit(new Task());
}
}
package com.stackoverflow.q16680096;
import java.net.Socket;
public class InitSocketTask implements Runnable
{
public void run()
{
Socket socket = SocketPool.get();
// Do initial setup here
}
}
package com.stackoverflow.q16680096;
import java.net.Socket;
public final class SocketPool
{
private static final ThreadLocal<Socket> SOCKETS = new ThreadLocal<Socket>(){
@Override
protected Socket initialValue()
{
return new Socket(); // Pass in suitable arguments here...
}
};
public static Socket get()
{
return SOCKETS.get();
}
}
package com.stackoverflow.q16680096;
import java.net.Socket;
public class Task implements Runnable
{
public void run()
{
Socket socket = SocketPool.get();
// Do stuff with socket...
}
}
Where each thread gets its own socket.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With