Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Share common data between two threads serving a Socket connection

I saw plenty of similar questions on SO but hardly any of them have Socket in the picture. So please take time to read the question.

I have server app (using ServerSocket) which listens for requests, and when a client attempts to connect, new thread is created to serve the client (and server is back to listening mode for new requests). Now, I need to respond one client based on what other client sent to server.

Example:

  • ServerSocket listening for incoming connections.
  • Client A connects, new thread is created to serve A.
  • Client B connects, new thread is created to serve B.
  • A sends message "Hello from A" to the Server.
  • Send this message as a response to Client B.

I'm new to this whole "inter-thread communication" thing. Obviously, above mentioned situation sounds dead simple, but I'm describing this to get a hint, as I'll be exchanging huge amount data among clients keeping server as intermediate.

Also, what if I want to keep a shared object limited to, say 10, particular Clients? such that, when 11th client connects to the server, I create new shared object, which will be used to exchange data between 11th, 12th, 13th..... upto 20th client. And so on for every single set of 10 clients.

What I tried: (foolish I guess)

  • I have a public class with that object supposed to be shared as public static, so that I can use it as global without instantiating it, like MyGlobalClass.SharedMsg.
  • That doesn't work, I was unable to send data received in one thread to the other.

I'm aware that there is an obvious locking problem since if one thread is writing to an object, other can't be accessing it until the first thread is done writing.

So what would be an ideal approach to this problem?

Update

Since the way in which I create threads for serving incoming connection requests, I can't understand how I can share same object among the threads, since using Global object as mentioned above doesn't work.

Following is how I listen for incoming connections and create serving threads dynamically.

// Method of server class
public void startServer()
{
    if (!isRunning)
    {
        try
        {
            isRunning = true;
            while (isRunning)
            {
                try
                {
                    new ClientHandler(mysocketserver.accept()).start();
                }
                catch (SocketTimeoutException ex)
                {
                    //nothing to perform here, go back again to listening.
                }
                catch (SocketException ex)
                {
                    //Not to handle, since I'll stop the server using SocketServer's close() method, and its going to throw SocketException anyway.
                }
            }
        }
        catch (Exception ex)
        {
            ex.printStackTrace();
        }
    }
    else
        System.out.println("Server Already Started!");
}

And the ClientHandler class.

public class ClientHandler extends Thread
{
    private Socket client = null;
    private ObjectInputStream in = null;
    private ObjectOutputStream out = null;

    public ClientHandler(Socket client)
    {
        super("ClientHandler");
        this.client = client;
    }

    //This run() is common for every Client that connects, and that's where the problem is.
    public void run()
    {
        try
        {
            in = new ObjectInputStream(client.getInputStream());
            out = new ObjectOutputStream(client.getOutputStream());

            //Message received from this thread.
            String msg = in.readObject().toString();
            System.out.println("Client @ "+ client.getInetAddress().getHostAddress() +" Says : "+msg);


            //Response to this client.
            out.writeObject("Message Received");

            out.close();
            in.close();
            client.close();
        }
        catch (Exception ex)
        {
            ex.printStackTrace();
        }
    }
}

I believe that the way I'm creating dynamic threads to serve each client that connects, sharing the same data source is not possible using Global object, since the body of run() above is exactly the same for every client that connects, hence this same method is both consumer and producer. What fixes should I make such that I could create dynamic threads for each connection and still share the same object.

like image 454
Kushal Avatar asked Jun 08 '12 06:06

Kushal


People also ask

Can two threads share a socket?

No two threads can use the same Socket because of the Synchronize sections. So they're never mutating it at the same time. However you're not free of deadlocks, and you may very well end up with many threads waiting for the sync on the same Socket, when others are available.

Do threads share sockets?

Sockets can be shared among threads in a given process without using the WSADuplicateSocket function because a socket descriptor is valid in all threads of a process.

What is common in socket and server socket?

Each socket has both an OutputStream and an InputStream. The client's OutputStream is connected to the server's InputStream, and the client's InputStream is connected to the server's OutputStream. TCP is a two-way communication protocol, hence data can be sent across both streams at the same time.

Can multiple threads read from same socket?

1. Yes, any arbitrary thread may read any open socket at any time. 2.


1 Answers

You probably want a queue for communication between each client. Each Queue will be the 'pipeline' for data pushed from one client to the other.

You would use it like so (pseudo code):

Thread 1:
Receive request from Client A, with message for Client B
Put message on back of concurrent Queue A2B
Respond to Client A.

Thread 2:
Receive request from Client B.
Pop message from front of Queue A2B
Respond to Client B with message.

You might also want it generic, so you have a AllToB Queue that many clients (and thus many threads) can write to.

Classes of note: ConcurrentLinkedQueue, ArrayBlockingQueue.

If you want to limit the number of messages, then ArrayBlockingQueue with its capacity constructor allows you to do this. If you don't need the blocking functionality, you can use the methods offer and poll rather than put and take.

I wouldn't worry about sharing the queues, it makes the problem significantly more complicated. Only do this if you know there is a memory usage problem you need to address.

EDIT: Based on your update:

If you need to share a single instance between all dynamically created instances you can either:

  1. Make a static instance.
  2. Pass it into the constructor.

Example of 1:

public class ClientHandler extends Thread
{
  public static final Map<ClientHandler, BlockingQueue<String>> messageQueues 
    = new ConcurrentHashMap<>();

  <snip>

  public ClientHandler(Socket client)
  {
    super("ClientHandler");
    this.client = client;
    // Note: Bad practice to reference 'this' in a constructor.
    // This can throw an error based on what the put method does.
    // As such, if you are to do this, put it at the end of the method.
    messageQueues.put(this, new ArrayBlockingQueue<>());
  }

  // You can now access this in the run() method like so:
  // Get messages for the current client.
  // messageQueues.get(this).poll();
  // Send messages to the thread for another client.
  // messageQueues.get(someClient).offer(message);

A couple of notes:

  • The messageQueues object should really contain some sort of identifier for the client rather than an object reference that is short lived.
  • A more testable design would pass the messageQueues object into the constructor to allow mocking.
  • I would probably recommend using a wrapper class for the map, so you can just call offer with 2 parameters rather than having to worry about the map semantics.
like image 89
Bringer128 Avatar answered Oct 12 '22 01:10

Bringer128