Basically I'm writing some code for a system that is designed to be very fast, small, and robust. I started off with some asynchronous examples for TcpListener
and TcpClient
and wrote a Server and Client class that is basically used in multiple places in my project.
Basically my server class (code will be below later) is all event based, and so is the client code. When I get packets - one by one - coming in through either a server or client socket - everything works fine.
However, if the sender - for example class A uses Client Class - sends a bunch of packets through a TCP Stream to the Server class in Class B.. Naturally the server class may get all the packets as one big lump sump. So when the callback for data recieved event occurs, I grab the buffer and then process it.
And here's where something funny happens. My issue isn't splitting all the packets out from a big buffer. My problem is that for some reason that I cannot understand.. Lets say I send 5 packets from Client to Server (or vice versa) and the other side gets all 5. The datarecieve event triggers, and then the bugger is grabbed and all 5 packets are in there. They are processed. But then the event triggers again..
In other words, instead of the event triggering one time, it triggers 5 times for 5 individual packets, and I end up processing a buffer containing those 5 packets 5 times.
And since I'm designing a distributed network, that means the node that the module talks to (module (Client Class) <--> node (Server Class) <--> client (Client Class)) gets 25 packets instead of 5. And then it forwards those on to the destination, which gets 25*5 or 125 packets.
I'm pretty sure I'm missing something obvious here. I've tried thinking of ways to make the event fire just once.. And I might end up tossing in my towel and rewrite the Server and Client classes so that they will be synchronous and have a thread per client instance (or on server side, a thread that does accepts, and a thread per client connection) - so that way I can handle the data flow better. I.e. packet comes in, if its whole, process. If its not, wait for it to be whole, etc.. using the typical start/end special bytes and so forth.
Server Class - most of it is there. Took out some of the irrevelant ones like KillClient, etc.
public class Server
{
private TcpListener serverListener;
private List<ServerClient> clients;
#region Callbacks
public delegate void incomingDataCallback(byte[] buffer, string clientID, TcpClient tcpClient);
public incomingDataCallback incomingData = null;
public delegate void incomingConnectionCallback(string clientID, TcpClient tcpClient);
public incomingConnectionCallback incomingConnection = null;
public delegate void connectionClosedCallback(string clientID, TcpClient tcpClient);
public connectionClosedCallback connectionClosed = null;
public delegate void dataWrittenCallback(string clientID, TcpClient tcpClient);
public dataWrittenCallback dataWritten = null;
#endregion
// Constructor
public Server(string listenIP, int listenPort)
{
// Create a new instance of serverlistener.
serverListener = new TcpListener(IPAddress.Parse(listenIP), listenPort);
this.clients = new List<ServerClient>();
this.Encoding = Encoding.Default;
}
~Server()
{
// Shut down the server.
this.Stop();
}
public Encoding Encoding { get; set; }
public IEnumerable<TcpClient> TcpClients
{
get
{
foreach (ServerClient client in this.clients)
{
yield return client.TcpClient;
}
}
}
public IEnumerable<TcpClient> TcpClients
{
get
{
foreach (ServerClient client in this.clients)
{
yield return client.TcpClient;
}
}
}
public void Stop()
{
this.serverListener.Stop();
lock (this.clients)
{
foreach (ServerClient client in this.clients)
{
client.TcpClient.Client.Disconnect(false);
if (connectionClosed != null)
connectionClosed(client.ID, client.TcpClient);
}
this.clients.Clear();
}
}
public void WriteToClient(TcpClient tcpClient, byte[] bytes)
{
NetworkStream networkStream = tcpClient.GetStream();
try
{
networkStream.BeginWrite(bytes, 0, bytes.Length, WriteCallback, tcpClient);
}
catch (System.IO.IOException ex)
{
// Port was closed before data could be written.
// So remove this guy from clients.
lock (this.clients)
{
foreach (ServerClient cl in clients)
{
if (cl.TcpClient.Equals(tcpClient))
{
this.clients.Remove(cl);
if (connectionClosed != null)
connectionClosed(cl.ID, cl.TcpClient);
break;
}
}
}
}
}
private void WriteCallback(IAsyncResult result)
{
TcpClient tcpClient = result.AsyncState as TcpClient;
NetworkStream networkStream = tcpClient.GetStream();
networkStream.EndWrite(result);
// Get the ID and return it
//ServerClient client = result.AsyncState as ServerClient;
//string ipaddr = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Address.ToString();
string port = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Port.ToString();
Console.WriteLine("Write callback called for: " + port);
// if (dataWritten != null)
// dataWritten(client.ID, tcpClient);
}
private void AcceptTcpClientCallback(IAsyncResult result)
{
TcpClient tcpClient;
try
{
tcpClient = serverListener.EndAcceptTcpClient(result);
}
catch
{
// Often get this error when shutting down the server
return;
}
NetworkStream networkStream = tcpClient.GetStream();
byte[] buffer = new byte[tcpClient.ReceiveBufferSize];
// Get the IP Address.. this will be used for id purposes.
string ipaddr = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Address.ToString();
string port = ((IPEndPoint)(tcpClient.Client.RemoteEndPoint)).Port.ToString();
// Create a client object for this client.
ServerClient client = new ServerClient(tcpClient, buffer, ipaddr + ":" + port);
Console.WriteLine("Data availiable: " + networkStream.DataAvailable.ToString());
Console.WriteLine("Amount of data: " + tcpClient.Available.ToString());
// Lock the list and add it in.
lock (this.clients)
{
this.clients.Add(client);
}
if (networkStream.DataAvailable)
{
int read = networkStream.Read(client.Buffer, 0, client.Buffer.Length);
Console.WriteLine("Calling ReadHandle directly with " + read.ToString() + " number of bytes. for clientid: " + client.ID);
ReadHandle(client, read, networkStream);
}
else
{
Console.WriteLine("Started beginRead for client in accept connection: " + client.ID);
networkStream.BeginRead(client.Buffer, 0, client.Buffer.Length, ReadCallback, client);
//networkStream.
Console.WriteLine("Data availiable: " + networkStream.DataAvailable.ToString());
Console.WriteLine("Amount of data: " + tcpClient.Available.ToString());
}
Console.WriteLine("Starting BeginAcceptTcpClient again - client: " + client.ID);
serverListener.BeginAcceptTcpClient(AcceptTcpClientCallback, null);
// Notify owner that new connection came in
if (incomingConnection != null)
incomingConnection(client.ID, tcpClient);
}
private void ReadCallback(IAsyncResult result)
{
ServerClient client = result.AsyncState as ServerClient;
if (client == null)
{
Console.WriteLine("ReadCallback: Null client");
return;
}
int read = 0;
NetworkStream networkStream = client.NetworkStream;
try
{
read = networkStream.EndRead(result);
}
catch (System.IO.IOException ex)
{
Console.WriteLine("ReadCallback: Exception occured during reading.. Message: " + ex.Message + " client " + client.ID);
lock (this.clients)
{
this.clients.Remove(client);
if (connectionClosed != null)
connectionClosed(client.ID, client.TcpClient);
return;
}
}
ReadHandle(client, read, networkStream);
}
private void ReadHandle(ServerClient client, int read, NetworkStream networkStream)
{
// If zero bytes read, then client disconnected.
if (read == 0)
{
Console.WriteLine("ReadHandle: Read == 0, closing connection for Client: " + client.ID);
lock (this.clients)
{
this.clients.Remove(client);
if (connectionClosed != null)
connectionClosed(client.ID, client.TcpClient);
return;
}
}
//string data = this.Encoding.GetString(client.Buffer, 0, read);
// Do something with the data object here.
if (incomingData != null)
incomingData(client.Buffer, client.ID, client.TcpClient);
// Go back to accepting data from client.
try
{
networkStream.BeginRead(client.Buffer, 0, client.Buffer.Length, ReadCallback, client);
Console.WriteLine("ReadHandle: BeginRead called for client " + client.ID);
}
catch (Exception ex)
{
// Damn, we just lost the client.
Console.WriteLine("ReadHandle: Exception occured during trying to BeginRead.. Message: " + ex.Message + " client " + client.ID);
lock (this.clients)
{
this.clients.Remove(client);
if (connectionClosed != null)
connectionClosed(client.ID, client.TcpClient);
return;
}
}
}
}
internal class ServerClient
{
public ServerClient(TcpClient tcpClient, byte[] buffer, string ipaddr)
{
if (tcpClient == null) throw new ArgumentNullException("tcpClient");
if (buffer == null) throw new ArgumentNullException("tcpClient");
if (ipaddr == null) throw new ArgumentNullException("tcpClient");
this.TcpClient = tcpClient;
this.Buffer = buffer;
this.ID = ipaddr;
}
public TcpClient TcpClient { get; private set; }
public byte[] Buffer { get; private set; }
public string ID { get; private set; }
public NetworkStream NetworkStream
{
get
{
return TcpClient.GetStream();
}
}
}
}
And here's the client class - it's smaller and simple compared to the server.
public class Client
{
private IPAddress address;
private int port;
private string ID;
//private WaitHandle addressSet;
private TcpClient tcpClient;
private int failedConnectionCount;
public bool keepOnTrying = false;
#region Callbacks
public delegate void incomingDataCallback(byte[] buffer, string serverID);
public incomingDataCallback incomingData = null;
public delegate void connectedCallback(string serverID);
public connectedCallback clientConnected = null;
public delegate void connectionFailedCallback(string serverID);
public connectionFailedCallback clientConnectionFailed = null;
public delegate void connectionClosedCallback(string serverID);
public connectionClosedCallback connectionClosed = null;
public delegate void dataWrittenCallback(string serverID);
public dataWrittenCallback dataWritten = null;
#endregion
public Client(IPAddress address, int port)
{
this.address = address;
if (port < 0) throw new ArgumentException();
this.port = port;
this.tcpClient = new TcpClient();
this.Encoding = Encoding.Default;
this.ID = address.ToString() + ":" + port.ToString();
tcpClient.ReceiveBufferSize = 16384;
tcpClient.SendBufferSize = 16384;
}
// Destructor
~Client()
{
this.Disconnect();
}
public Encoding Encoding { get; set; }
public void Connect()
{
tcpClient.BeginConnect(address, port, ConnectCallback, null);
}
public void Disconnect()
{
tcpClient.Close();
if (connectionClosed != null)
connectionClosed(ID);
}
public void Write(byte[] bytes)
{
NetworkStream networkStream = tcpClient.GetStream();
networkStream.BeginWrite(bytes, 0, bytes.Length, WriteCallback, null);
}
private void WriteCallback(IAsyncResult result)
{
NetworkStream networkStream = tcpClient.GetStream();
if (tcpClient.Connected)
{
networkStream.EndWrite(result);
}
if (dataWritten != null)
dataWritten(ID);
}
private void ConnectCallback(IAsyncResult result)
{
// Check to see if connected successfully or not. If we didnt, then the try/catch block will increment
// the failed connection count.
try
{
tcpClient.EndConnect(result);
}
catch
{
Interlocked.Increment(ref failedConnectionCount);
if (keepOnTrying)
tcpClient.BeginConnect(address, port, ConnectCallback, null);
if (clientConnectionFailed != null)
clientConnectionFailed(ID);
return;
}
// Connected successfully.
// Now begin async read operation.
NetworkStream networkStream = tcpClient.GetStream();
byte[] buffer = new byte[tcpClient.ReceiveBufferSize];
networkStream.BeginRead(buffer, 0, buffer.Length, ReadCallback, buffer);
if (clientConnected != null)
clientConnected(ID);
}
private void ReadCallback(IAsyncResult result)
{
int read;
NetworkStream networkStream;
try
{
networkStream = tcpClient.GetStream();
read = networkStream.EndRead(result);
}
catch
{
// An error has occured when reading.. -.-
Console.WriteLine("Error occured while reading for ID: " + ID);
return;
}
// If read is 0, then connection was closed
if (read == 0)
{
if (connectionClosed != null)
connectionClosed(ID);
return;
}
if (result.IsCompleted == false)
{
Console.WriteLine("Uh oh ");
}
byte[] buffer = result.AsyncState as byte[];
if (incomingData != null)
incomingData(buffer, ID);
// Then begin reading again.
networkStream.BeginRead(buffer, 0, buffer.Length, ReadCallback, buffer);
}
}
And the way I use those classes is this:
So to replicate my problem, do this:
So before I go and rewrite the code to be synchronous and run in a thread(s):
Update: Due to Servy's comment - here's how I use these classes. Not c/p'ing everything, just the relevant parts.
Node - uses Server class.
class ModuleClient
{
private List<ModuleClientInfo> clients = new List<ModuleClientInfo>();
private Server myServer = null;
public ModuleClient()
{
// create a server object
myServer = new Server("127.0.0.1", 9000);
// Attach callbacks
myServer.connectionClosed = connClosed;
myServer.dataWritten = dataWritten;
myServer.incomingConnection = incomingConn;
myServer.incomingData = incomingData;
}
public void startListeningForModules()
{
if (!listeningForModules)
myServer.Start();
else
return;
listeningForModules = true;
}
private void incomingData(byte[] buffer, string clientID, TcpClient tcpClient)
{
Console.WriteLine("Incoming Data from " + clientID);
incomingPacketStruct newPacket = new incomingPacketStruct();
newPacket.clientID = clientID;
newPacket.buffer = buffer;
newPacket.tcpClient = tcpClient;
}
Its in incomingData that I notice I'm having 5 packets in the buffer, and then incomingData is called 5 times.
Now as for the client's incomingData (keep in mind I have not noticed this behavior in outgoing data, nor is it relevant. Lets say I get 10 json packets all at once, I will send them on to the node - so thats 10 writes. The node will get them all in the same buffer, which then will call the server's incoming data 10 times and each time, it will see 10 packets.
Client's incoming data:
public partial class Program : ServiceBase
{
// Globals
private static SocketObject.Client myHermesClient = null;
private static JSONInterface myJsonInterface = null;
private static void mainThread(object data)
{
// Take care of client and callbacks..
myHermesClient = new SocketObject.Client(System.Net.IPAddress.Parse("127.0.0.1"), 9000);
myHermesClient.connectionClosed = hermesConnectionClosed;
myHermesClient.clientConnected = hermesConnected;
myHermesClient.dataWritten = hermesDataWritten;
myHermesClient.incomingData = hermesIncomingData;
myHermesClient.clientConnectionFailed = hermesConnectionFailed;
myHermesClient.keepOnTrying = true;
// Begin async connect
myHermesClient.Connect();
// Main loop for service.
while (serviceRunning)
{
Thread.Sleep(500);
}
}
#region Hermes Client Code
private static void hermesIncomingData(byte[] buffer, string serverID)
{
}
And again, same thing. When the server sends a lot of data back to the client.. If you break and look at the buffer, you will see what I'm talking about.
Now, want to make this clear.. My issue is NOT breaking up the packets. I have code (not included because proprietary, and not relevant to this - it does not modify buffer, only creates a list of objects from it) - but the issue is the callback being called multiple times as stated above.
Inside private void ReadCallback(IAsyncResult result)
ReadHandle(client, read, networkStream);
Then inside ReadHandle()
you setup the call back again.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With