I'm new to Rx, so I'm probably making some essential mistakes here.
I wanted to create a very simple socket server that could receiver messages from clients using Observables. For this I'm using Rxx, which provides extension methods in the System.Net.Sockets namespace, and also provides the ObserableTcpListener static factory class.
Here's what I have so far, pretty much stole it from various sources:
IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9001);
TcpListener listener = new TcpListener(endpoint);
IObservable<TcpClient> clients = listener
.StartSocketObservable(1)
.SelectMany<Socket, TcpClient>(socket => SocketToTcpClient(socket));
.Finally(listener.Stop)
clients.Subscribe(client =>
{
OnConnect(client).Subscribe(
message => OnMessage(client, message),
ex => OnException(client, ex),
() => OnCompleted(client));
});
private static IObservable<TcpClient> SocketToTcpClient(Socket socket)
{
TcpClient client = new TcpClient();
client.Client = socket;
return Observable.Return<TcpClient>(client);
}
private static IObservable<byte[]> OnConnect(TcpClient client)
{
return client.Client.ReceiveUntilCompleted(SocketFlags.None);
}
private static void OnMessage(TcpClient client, byte[] message)
{
Console.WriteLine("Mesage Received! - {0}", Encoding.UTF8.GetString(message));
}
private static void OnCompleted(TcpClient client)
{
Console.WriteLine("Completed.");
}
private static void OnException(TcpClient client, Exception ex)
{
Console.WriteLine("Exception: {0}", ex.ToString());
}
This works... up to a point. I can make a single Client connection. As soon as that connection terminates, it would seem the Observable sequence terminates and .Finally(listener.Stop)
is called. Obviously, that's not what I want.
I tried using the ObserableTcpListener.Start()
factory class, but that nets me the exact same result.
IObservable<TcpClient> sockets = ObservableTcpListener.Start(endpoint);
sockets.Subscribe(client =>
{
OnConnect(client).Subscribe(
message => OnMessage(client, message),
ex => OnException(client, ex),
() => OnCompleted(client));
});
I suppose I do understand the problem here: the clients
observable sequence is simply empty after the first client terminates, thus .Finally(listener.Stop)
is called.
What would I need to do to circumvent this? How can I keep listening for incoming connections?
Make your Observable
hot and persist while there are subscriptions.
IObservable<TcpClient> clients = listener
.StartSocketObservable(1)
.SelectMany<Socket, TcpClient>(socket => SocketToTcpClient(socket))
.Finally(listener.Stop)
.Publish().RefCount();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With