Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Issues with Asynchronous Reads in .NET - it seems that my callback is being called multiple times when it should not be

Basically I'm writing some code for a system that is designed to be very fast, small, and robust. I started off with some asynchronous examples for TcpListener and TcpClient and wrote a Server and Client class that is basically used in multiple places in my project.

Basically my server class (code will be below later) is all event based, and so is the client code. When I get packets - one by one - coming in through either a server or client socket - everything works fine.

However, if the sender - for example class A uses Client Class - sends a bunch of packets through a TCP Stream to the Server class in Class B.. Naturally the server class may get all the packets as one big lump sump. So when the callback for data recieved event occurs, I grab the buffer and then process it.

And here's where something funny happens. My issue isn't splitting all the packets out from a big buffer. My problem is that for some reason that I cannot understand.. Lets say I send 5 packets from Client to Server (or vice versa) and the other side gets all 5. The datarecieve event triggers, and then the bugger is grabbed and all 5 packets are in there. They are processed. But then the event triggers again..

In other words, instead of the event triggering one time, it triggers 5 times for 5 individual packets, and I end up processing a buffer containing those 5 packets 5 times.

And since I'm designing a distributed network, that means the node that the module talks to (module (Client Class) <--> node (Server Class) <--> client (Client Class)) gets 25 packets instead of 5. And then it forwards those on to the destination, which gets 25*5 or 125 packets.

I'm pretty sure I'm missing something obvious here. I've tried thinking of ways to make the event fire just once.. And I might end up tossing in my towel and rewrite the Server and Client classes so that they will be synchronous and have a thread per client instance (or on server side, a thread that does accepts, and a thread per client connection) - so that way I can handle the data flow better. I.e. packet comes in, if its whole, process. If its not, wait for it to be whole, etc.. using the typical start/end special bytes and so forth.

Server Class - most of it is there. Took out some of the irrevelant ones like KillClient, etc.

   public class Server
{
    private TcpListener serverListener;
    private List<ServerClient> clients;

    #region Callbacks


    public delegate void incomingDataCallback(byte[] buffer, string clientID, TcpClient tcpClient);
    public incomingDataCallback incomingData = null;

    public delegate void incomingConnectionCallback(string clientID, TcpClient tcpClient);
    public incomingConnectionCallback incomingConnection = null;

    public delegate void connectionClosedCallback(string clientID, TcpClient tcpClient);
    public connectionClosedCallback connectionClosed = null;

    public delegate void dataWrittenCallback(string clientID, TcpClient tcpClient);
    public dataWrittenCallback dataWritten = null;


    #endregion

    // Constructor
    public Server(string listenIP, int listenPort)
    {
        // Create a new instance of serverlistener.
        serverListener = new TcpListener(IPAddress.Parse(listenIP), listenPort);
        this.clients = new List<ServerClient>();
        this.Encoding = Encoding.Default;
    }

    ~Server()
    {
        // Shut down the server.
        this.Stop();
    }

    public Encoding Encoding { get; set; }

    public IEnumerable<TcpClient> TcpClients
    {
        get
        {
            foreach (ServerClient client in this.clients)
            {
                yield return client.TcpClient;
            }
        }
    }

    public IEnumerable<TcpClient> TcpClients
    {
        get
        {
            foreach (ServerClient client in this.clients)
            {
                yield return client.TcpClient;
            }
        }
    }

    public void Stop()
    {
        this.serverListener.Stop();
        lock (this.clients)
        {
            foreach (ServerClient client in this.clients)
            {
                client.TcpClient.Client.Disconnect(false);
                if (connectionClosed != null)
                    connectionClosed(client.ID, client.TcpClient);
            }
            this.clients.Clear();
        }
    }

    public void WriteToClient(TcpClient tcpClient, byte[] bytes)
    {
        NetworkStream networkStream = tcpClient.GetStream();

        try
        {
            networkStream.BeginWrite(bytes, 0, bytes.Length, WriteCallback, tcpClient);
        }
        catch (System.IO.IOException ex)
        {
            // Port was closed before data could be written. 
            // So remove this guy from clients.
            lock (this.clients)
            {
                foreach (ServerClient cl in clients)
                {
                    if (cl.TcpClient.Equals(tcpClient))
                    {
                        this.clients.Remove(cl);
                        if (connectionClosed != null)
                            connectionClosed(cl.ID, cl.TcpClient);
                        break;
                    }
                }
            }

        }
    }

    private void WriteCallback(IAsyncResult result)
    {
        TcpClient tcpClient = result.AsyncState as TcpClient;
        NetworkStream networkStream = tcpClient.GetStream();
        networkStream.EndWrite(result);

        // Get the ID and return it
        //ServerClient client = result.AsyncState as ServerClient;

        //string ipaddr = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Address.ToString();
        string port = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Port.ToString();


        Console.WriteLine("Write callback called for: " + port);

        //                if (dataWritten != null)
        //                  dataWritten(client.ID, tcpClient);
    }

    private void AcceptTcpClientCallback(IAsyncResult result)
    {
        TcpClient tcpClient;

        try
        {
            tcpClient = serverListener.EndAcceptTcpClient(result);
        }
        catch
        {
            // Often get this error when shutting down the server
            return;
        }

        NetworkStream networkStream = tcpClient.GetStream();
        byte[] buffer = new byte[tcpClient.ReceiveBufferSize];

        // Get the IP Address.. this will be used for id purposes. 
        string ipaddr = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Address.ToString();
        string port = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Port.ToString();

        // Create a client object for this client.
        ServerClient client = new ServerClient(tcpClient, buffer, ipaddr + ":" + port);

        Console.WriteLine("Data availiable: " + networkStream.DataAvailable.ToString());
        Console.WriteLine("Amount of data: " + tcpClient.Available.ToString());

        // Lock the list and add it in.
        lock (this.clients)
        {
            this.clients.Add(client);
        }

        if (networkStream.DataAvailable)
        {

            int read = networkStream.Read(client.Buffer, 0, client.Buffer.Length);
            Console.WriteLine("Calling ReadHandle directly with " + read.ToString() + " number of bytes. for clientid: " + client.ID);
            ReadHandle(client, read, networkStream);

        }
        else
        {

            Console.WriteLine("Started beginRead for client in accept connection: " + client.ID);
            networkStream.BeginRead(client.Buffer, 0, client.Buffer.Length, ReadCallback, client);
            //networkStream.

            Console.WriteLine("Data availiable: " + networkStream.DataAvailable.ToString());
            Console.WriteLine("Amount of data: " + tcpClient.Available.ToString());
        }

        Console.WriteLine("Starting BeginAcceptTcpClient again - client: " + client.ID);
        serverListener.BeginAcceptTcpClient(AcceptTcpClientCallback, null);

        // Notify owner that new connection came in
        if (incomingConnection != null)
            incomingConnection(client.ID, tcpClient);
    }

    private void ReadCallback(IAsyncResult result)
    {
        ServerClient client = result.AsyncState as ServerClient;


        if (client == null)
        {
            Console.WriteLine("ReadCallback: Null client");
            return;
        }

        int read = 0;

        NetworkStream networkStream = client.NetworkStream;
        try
        {
            read = networkStream.EndRead(result);
        }
        catch (System.IO.IOException ex)
        {
            Console.WriteLine("ReadCallback: Exception occured during reading.. Message: " + ex.Message + " client " + client.ID);
            lock (this.clients)
            {
                this.clients.Remove(client);
                if (connectionClosed != null)
                    connectionClosed(client.ID, client.TcpClient);
                return;
            }

        }

        ReadHandle(client, read, networkStream);
    }

    private void ReadHandle(ServerClient client, int read, NetworkStream networkStream)
    {



        // If zero bytes read, then client disconnected.
        if (read == 0)
        {
            Console.WriteLine("ReadHandle: Read == 0, closing connection for Client: " + client.ID);
            lock (this.clients)
            {
                this.clients.Remove(client);
                if (connectionClosed != null)
                    connectionClosed(client.ID, client.TcpClient);
                return;
            }
        }

        //string data = this.Encoding.GetString(client.Buffer, 0, read);

        // Do something with the data object here.
        if (incomingData != null)
            incomingData(client.Buffer, client.ID, client.TcpClient);

        // Go back to accepting data from client.
        try
        {
          networkStream.BeginRead(client.Buffer, 0, client.Buffer.Length, ReadCallback, client);
            Console.WriteLine("ReadHandle: BeginRead called for client " + client.ID);
        }
        catch (Exception ex)
        {
            // Damn, we just lost the client.
            Console.WriteLine("ReadHandle: Exception occured during trying to BeginRead.. Message: " + ex.Message + " client " + client.ID);
            lock (this.clients)
            {
                this.clients.Remove(client);
                if (connectionClosed != null)
                    connectionClosed(client.ID, client.TcpClient);
                return;
            }
        }

    }
}

internal class ServerClient
{
    public ServerClient(TcpClient tcpClient, byte[] buffer, string ipaddr)
    {
        if (tcpClient == null) throw new ArgumentNullException("tcpClient");
        if (buffer == null) throw new ArgumentNullException("tcpClient");
        if (ipaddr == null) throw new ArgumentNullException("tcpClient");

        this.TcpClient = tcpClient;
        this.Buffer = buffer;
        this.ID = ipaddr;
    }

    public TcpClient TcpClient { get; private set; }
    public byte[] Buffer { get; private set; }
    public string ID { get; private set; }
    public NetworkStream NetworkStream
    {
        get
        {
            return TcpClient.GetStream();
        }
    }
}
}

And here's the client class - it's smaller and simple compared to the server.

public class Client
{
    private IPAddress address;
    private int port;
    private string ID;

    //private WaitHandle addressSet;
    private TcpClient tcpClient;
    private int failedConnectionCount;

    public bool keepOnTrying = false;

    #region Callbacks

    public delegate void incomingDataCallback(byte[] buffer, string serverID);
    public incomingDataCallback incomingData = null;


    public delegate void connectedCallback(string serverID);
    public connectedCallback clientConnected = null;

    public delegate void connectionFailedCallback(string serverID);
    public connectionFailedCallback clientConnectionFailed = null;

    public delegate void connectionClosedCallback(string serverID);
    public connectionClosedCallback connectionClosed = null;

    public delegate void dataWrittenCallback(string serverID);
    public dataWrittenCallback dataWritten = null;

    #endregion

    public Client(IPAddress address, int port)
    {
        this.address = address;

        if (port < 0) throw new ArgumentException();

        this.port = port;
        this.tcpClient = new TcpClient();
        this.Encoding = Encoding.Default;
        this.ID = address.ToString() + ":" + port.ToString();

        tcpClient.ReceiveBufferSize = 16384;
        tcpClient.SendBufferSize = 16384;
    }

    // Destructor
    ~Client()
    {
        this.Disconnect();
    }

    public Encoding Encoding { get; set; }


    public void Connect()
    {
        tcpClient.BeginConnect(address, port, ConnectCallback, null);
    }

    public void Disconnect()
    {
        tcpClient.Close();
        if (connectionClosed != null)
            connectionClosed(ID);
    }

    public void Write(byte[] bytes)
    {
        NetworkStream networkStream = tcpClient.GetStream();

        networkStream.BeginWrite(bytes, 0, bytes.Length, WriteCallback, null);
    }

    private void WriteCallback(IAsyncResult result)
    {
        NetworkStream networkStream = tcpClient.GetStream();

        if (tcpClient.Connected)
        {
            networkStream.EndWrite(result);
        }

        if (dataWritten != null)
            dataWritten(ID);
    }

    private void ConnectCallback(IAsyncResult result)
    {
        // Check to see if connected successfully or not. If we didnt, then the try/catch block will increment
        // the failed connection count.
        try
        {
            tcpClient.EndConnect(result);
        }
        catch
        {
            Interlocked.Increment(ref failedConnectionCount);
            if (keepOnTrying)
                tcpClient.BeginConnect(address, port, ConnectCallback, null);

            if (clientConnectionFailed != null)
                clientConnectionFailed(ID);

            return;
        }

        // Connected successfully.
        // Now begin async read operation.

        NetworkStream networkStream = tcpClient.GetStream();
        byte[] buffer = new byte[tcpClient.ReceiveBufferSize];
        networkStream.BeginRead(buffer, 0, buffer.Length, ReadCallback, buffer);

        if (clientConnected != null)
            clientConnected(ID);
    }

    private void ReadCallback(IAsyncResult result)
    {
        int read;
        NetworkStream networkStream;

        try
        {
            networkStream = tcpClient.GetStream();
            read = networkStream.EndRead(result);

        }
        catch
        {
            // An error has occured when reading.. -.-
            Console.WriteLine("Error occured while reading for ID: " + ID);
            return;
        }



        // If read is 0, then connection was closed

        if (read == 0)
        {
            if (connectionClosed != null)
                connectionClosed(ID);
            return;
        }

        if (result.IsCompleted == false)
        {
            Console.WriteLine("Uh oh ");
        }

        byte[] buffer = result.AsyncState as byte[];

        if (incomingData != null)
            incomingData(buffer, ID);

        // Then begin reading again.
        networkStream.BeginRead(buffer, 0, buffer.Length, ReadCallback, buffer);
    }

}

And the way I use those classes is this:

  1. Create a Class and then make an object of either server or client.
  2. Tie in all the callbacks. I.e. create functions in your class for each of the callbacks.
  3. Call server start, or client connect. Depending on which you are using.

So to replicate my problem, do this:

  1. Make a server class in one program, and client in another. Have the client connect to the server.
  2. Do a callback for the data coming in. I use serialization, so you could do something similiar.
  3. Have the client send a bunch of data to the server all at once. For me, I'm converting JSON data into my own format in my Module, and then I send it to the server. So the server gets a bunch of packets at once.
  4. Should be seeing - if its fast enough - that the server will get all of the packets into the receiving buffer AND every time it calls the incomingDataCallback - you will have a buffer with all the packets in it. And it will call that for every packet received. Not byte, whole packets.

So before I go and rewrite the code to be synchronous and run in a thread(s):

  1. Is there anything I could do different / better to make it so that way when the data comes in - either it calls the event once and I can process all the packets in the buffer - or -
  2. Is there a way to make sure that any other events that get called will not share the same buffer as the initial one? I know its a waste of processor time - but I can have an "if first 10 bytes are 00, return" line in my incomingDataCallback handler. That's why I was thinking of making buffer all nulled out in the first event and detect those on the subsequent events.

Update: Due to Servy's comment - here's how I use these classes. Not c/p'ing everything, just the relevant parts.

Node - uses Server class.

class ModuleClient
{
    private List<ModuleClientInfo> clients = new List<ModuleClientInfo>();
    private Server myServer = null;

    public ModuleClient()
    {
        // create a server object
        myServer = new Server("127.0.0.1", 9000);

        // Attach callbacks
        myServer.connectionClosed = connClosed;
        myServer.dataWritten = dataWritten;
        myServer.incomingConnection = incomingConn;
        myServer.incomingData = incomingData;
    }

    public void startListeningForModules()
    {
        if (!listeningForModules)
            myServer.Start();
        else
            return;

        listeningForModules = true;
    }

    private void incomingData(byte[] buffer, string clientID, TcpClient tcpClient)
    {
        Console.WriteLine("Incoming Data from " + clientID);

        incomingPacketStruct newPacket = new incomingPacketStruct();
        newPacket.clientID = clientID;
        newPacket.buffer = buffer;
        newPacket.tcpClient = tcpClient;
    }

Its in incomingData that I notice I'm having 5 packets in the buffer, and then incomingData is called 5 times.

Now as for the client's incomingData (keep in mind I have not noticed this behavior in outgoing data, nor is it relevant. Lets say I get 10 json packets all at once, I will send them on to the node - so thats 10 writes. The node will get them all in the same buffer, which then will call the server's incoming data 10 times and each time, it will see 10 packets.

Client's incoming data:

public partial class Program : ServiceBase
{
   // Globals
    private static SocketObject.Client myHermesClient = null;
    private static JSONInterface myJsonInterface = null;

    private static void mainThread(object data)
    {

        // Take care of client and callbacks..
        myHermesClient = new SocketObject.Client(System.Net.IPAddress.Parse("127.0.0.1"), 9000);
        myHermesClient.connectionClosed = hermesConnectionClosed;
        myHermesClient.clientConnected = hermesConnected;
        myHermesClient.dataWritten = hermesDataWritten;
        myHermesClient.incomingData = hermesIncomingData;
        myHermesClient.clientConnectionFailed = hermesConnectionFailed;

        myHermesClient.keepOnTrying = true;

        // Begin async connect
        myHermesClient.Connect();


        // Main loop for service.
        while (serviceRunning)
        {
            Thread.Sleep(500);
        }

    }

    #region Hermes Client Code
    private static void hermesIncomingData(byte[] buffer, string serverID)
    {

    }

And again, same thing. When the server sends a lot of data back to the client.. If you break and look at the buffer, you will see what I'm talking about.

Now, want to make this clear.. My issue is NOT breaking up the packets. I have code (not included because proprietary, and not relevant to this - it does not modify buffer, only creates a list of objects from it) - but the issue is the callback being called multiple times as stated above.

like image 682
fireether Avatar asked Mar 02 '13 04:03

fireether


1 Answers

Inside private void ReadCallback(IAsyncResult result)

ReadHandle(client, read, networkStream); 

Then inside ReadHandle() you setup the call back again.

like image 151
playerone Avatar answered Sep 29 '22 19:09

playerone