Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Observable TcpListener terminates after single connection

I'm new to Rx, so I'm probably making some essential mistakes here.

I wanted to create a very simple socket server that could receiver messages from clients using Observables. For this I'm using Rxx, which provides extension methods in the System.Net.Sockets namespace, and also provides the ObserableTcpListener static factory class.

Here's what I have so far, pretty much stole it from various sources:

IPEndPoint endpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9001);
TcpListener listener = new TcpListener(endpoint);

IObservable<TcpClient> clients = listener
    .StartSocketObservable(1)
    .SelectMany<Socket, TcpClient>(socket => SocketToTcpClient(socket));
    .Finally(listener.Stop)

clients.Subscribe(client =>
{
    OnConnect(client).Subscribe(
        message => OnMessage(client, message),
        ex => OnException(client, ex),
        () => OnCompleted(client));
});

private static IObservable<TcpClient> SocketToTcpClient(Socket socket)
{
    TcpClient client = new TcpClient();
    client.Client = socket;
    return Observable.Return<TcpClient>(client);
}

private static IObservable<byte[]> OnConnect(TcpClient client)
{
    return client.Client.ReceiveUntilCompleted(SocketFlags.None);
}

private static void OnMessage(TcpClient client, byte[] message)
{
    Console.WriteLine("Mesage Received! - {0}", Encoding.UTF8.GetString(message));
}

private static void OnCompleted(TcpClient client)
{
    Console.WriteLine("Completed.");
}

private static void OnException(TcpClient client, Exception ex)
{
    Console.WriteLine("Exception: {0}", ex.ToString());
}

This works... up to a point. I can make a single Client connection. As soon as that connection terminates, it would seem the Observable sequence terminates and .Finally(listener.Stop) is called. Obviously, that's not what I want.

I tried using the ObserableTcpListener.Start() factory class, but that nets me the exact same result.

IObservable<TcpClient> sockets = ObservableTcpListener.Start(endpoint);
sockets.Subscribe(client =>
{
    OnConnect(client).Subscribe(
        message => OnMessage(client, message),
        ex => OnException(client, ex),
        () => OnCompleted(client));
});

I suppose I do understand the problem here: the clients observable sequence is simply empty after the first client terminates, thus .Finally(listener.Stop) is called.

What would I need to do to circumvent this? How can I keep listening for incoming connections?

like image 460
romatthe Avatar asked Jan 23 '16 15:01

romatthe


1 Answers

Make your Observable hot and persist while there are subscriptions.

IObservable<TcpClient> clients = listener
    .StartSocketObservable(1)
    .SelectMany<Socket, TcpClient>(socket => SocketToTcpClient(socket))
    .Finally(listener.Stop)
    .Publish().RefCount();
like image 148
supertopi Avatar answered Oct 03 '22 00:10

supertopi