Update: check out the example at the bottom
I need to message between classes. The publisher will loop indefinitely, call some method to get data, and then pass the result of that call into OnNext
. There can be many subscribers, but there should only ever be one IObservable, and one long-running task. Here is an implementation.
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
namespace UnitTestProject1
{
[TestClass]
public class UnitTest1
{
private static string GetSomeData() => "Hi";
[TestMethod]
public async Task RunMessagingAsync()
{
var subject = new Subject<string>();
//Create a class and inject the subject as IObserver
new Publisher(subject);
//Create a class and inject the subject as IObservable
new Subscriber(subject, 1.ToString());
new Subscriber(subject, 2.ToString());
new Subscriber(subject, 3.ToString());
//Run the loop for 3 seconds
await Task.Delay(3000);
}
class Publisher
{
public Publisher(IObserver<string> observer)
{
Task.Run(async () =>
{
//Loop forever
while (true)
{
//Get some data, publish it with OnNext and wait 500 milliseconds
observer.OnNext(GetSomeData());
await Task.Delay(500);
}
});
}
}
class Subscriber
{
public string Name;
//Listen for OnNext and write to the debug window when it happens
public Subscriber(IObservable<string> observable, string name)
{
Name = name;
var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
}
}
}
}
Output:
Name: 1 Message: Hi
Name: 2 Message: Hi
Name: 3 Message: Hi
Name: 1 Message: Hi
Name: 2 Message: Hi
Name: 3 Message: Hi
This works fine. Notice that only one IObserver
sends messages, but all subscriptions pick up the message. But, how do I separate the IObservable
and the IObserver
? They are glued together as a Subject
. Here is another approach.
[TestMethod]
public async Task RunMessagingAsync2()
{
var observers = new List<IObserver<string>>();
var observable = Observable.Create(
(IObserver<string> observer) =>
{
observers.Add(observer);
Task.Run(async () =>
{
while (true)
{
try
{
observer.OnNext(GetSomeData());
}
catch (Exception ex)
{
observer.OnError(ex);
}
await Task.Delay(500);
}
});
return Disposable.Create(() => { });
});
//Create a class and inject the subject as IObservable
new Subscriber(observable);
new Subscriber(observable);
//Run the loop for 10 seconds
await Task.Delay(10000);
Assert.IsTrue(ReferenceEquals(observers[0], observers[1]));
}
The problem here is that this creates two separate Task
s and two separate IObserver
s. Every subscription creates a new IObserver. You can confirm that because the Assert
here fails. This doesn't really make any sense to me. From what I understand of Reactive programming, I wouldn't expect the Subscribe
method here to create a new IObserver
each time. Check out this gist. It is a slight modification of the Observable.Create example. It shows how the Subscribe method causes an IObserver to be created each time it is called. How can I achieve the functionality from the first example without using a Subject
?
Here is another approach that does not use Reactive UI at all... You could create a Subject
from the publisher if you want to, but it is not necessary.
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Threading.Tasks;
namespace UnitTestProject1
{
[TestClass]
public class UnitTest1
{
private static string GetSomeData() => "Hi";
class Publisher
{
public Publisher(Action<string> onNext)
{
Task.Run(async () =>
{
//Loop forever
while (true)
{
//Get some data, publish it with OnNext and wait 500 milliseconds
onNext(GetSomeData());
await Task.Delay(500);
}
});
}
}
class Subscriber
{
//Listen for OnNext and write to the debug window when it happens
public void ReceiveMessage(string message) => Debug.WriteLine(message);
}
[TestMethod]
public async Task RunMessagingAsync()
{
//Create a class and inject the subject as IObservable
var subscriber = new Subscriber();
//Create a class and inject the subject as IObserver
new Publisher(subscriber.ReceiveMessage);
//Run the loop for 10 seconds
await Task.Delay(10000);
}
}
}
Lastly, I should add that ReactiveUI used to have a MessageBus class. I'm not sure if it got removed or not, but it is no longer recommended. What do they suggest we use instead?
Working Example
This version is correct. I guess the only thing I'm asking now is how do I do the equivalent of this with Observable.Create
? The problem with Observable.Create
is that it runs the action for each subscription. That is not the intended functionality. The long running task here only runs once no matter how many subscriptions there are.
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace UnitTestProject1
{
class Subscriber
{
public string Name;
//Listen for OnNext and write to the debug window when it happens
public Subscriber(IObservable<string> observable, string name)
{
Name = name;
var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
}
}
internal class BasicObservable<T> : IObservable<T>
{
List<IObserver<T>> _observers = new List<IObserver<T>>();
public BasicObservable(
Func<T> getData,
TimeSpan? interval = null,
CancellationToken cancellationToken = default
) =>
Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(interval ?? new TimeSpan(0, 0, 1));
var data = getData();
_observers.ForEach(o => o.OnNext(data));
}
catch (Exception ex)
{
_observers.ForEach(o => o.OnError(ex));
}
}
_observers.ForEach(o => o.OnCompleted());
}, cancellationToken);
public IDisposable Subscribe(IObserver<T> observer)
{
_observers.Add(observer);
return Disposable.Create(observer, (o) => _observers.Remove(o));
}
}
public static class ObservableExtensions
{
public static IObservable<T> CreateObservable<T>(
this Func<T> getData,
CancellationToken cancellationToken = default)
=> new BasicObservable<T>(getData, default, cancellationToken);
public static IObservable<T> CreateObservable<T>(
this Func<T> getData,
TimeSpan? interval = null,
CancellationToken cancellationToken = default)
=> new BasicObservable<T>(getData, interval, cancellationToken);
}
[TestClass]
public class UnitTest1
{
string GetData() => "Hi";
[TestMethod]
public async Task Messaging()
{
var cancellationSource = new CancellationTokenSource();
var cancellationToken = cancellationSource.Token;
Func<string> getData = GetData;
var publisher = getData.CreateObservable(cancellationToken);
new Subscriber(publisher, "One");
new Subscriber(publisher, "Two");
for (var i = 0; true; i++)
{
if (i >= 5)
{
cancellationSource.Cancel();
}
await Task.Delay(1000);
}
}
}
}
At first you must familiarize yourself with the theory of "cold" and "hot" observables. Here is the definition from the Introduction to RX.
What you want is a hot observable, and the problem is that the Observable.Create
method creates cold observables. But you can make any observable hot by using the Publish
operator. This operator provides a way to have a single underlying subscription shared by multiple independent observers. Example:
int index = 0;
var coldObservable = Observable.Create<int>(observer =>
{
_ = Task.Run(async () =>
{
while (true)
{
observer.OnNext(++index);
await Task.Delay(1000);
}
});
return Disposable.Empty;
});
IConnectableObservable<int> hotObservable = coldObservable.Publish();
hotObservable.Connect(); // Causes the start of the loop
hotObservable.Subscribe(s => Console.WriteLine($"Observer A received #{s}"));
hotObservable.Subscribe(s => Console.WriteLine($"Observer B received #{s}"));
The coldObservable
created by the Observable.Create
is subscribed when the hotObservable.Connect
method is invoked, and then all notifications generated by that single subscription are propagated to all subscribers of the hotObservable
.
Output:
Observer A received #1
Observer B received #1
Observer A received #2
Observer B received #2
Observer A received #3
Observer B received #3
Observer A received #4
Observer B received #4
Observer A received #5
Observer B received #5
Observer A received #6
Observer B received #6
...
Important: the purpose of the example above is to demonstrate the Publish
operator, and not to serve as an example of good quality RX code. One of its problems is that by subscribing the observers after connecting to the source becomes theoretically possible that the first notification will not be send to some or all of the observers, because it may be created before their subscription. There is a race condition in other words.
There is an alternative method of managing the lifetime of an IConnectableObservable
, the operator RefCount
:
Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.
var hotObservable = coldObservable.Publish().RefCount();
This way you don't need to Connect
manually. The connection occurs automatically with the first subscription, and it is disposed automatically with the last unsubscription.
I've added this as an answer because I feel that the code that Christian posted in his answer is dangerous as it's mixing Tasks and Rx and there are race conditions.
Here's an alternative that fixes most of these issues:
public class UnitTest1
{
private string GetData() => "Hi";
private IDisposable Subscriber(IObservable<string> observable, string name) =>
observable.Subscribe(s => Debug.WriteLine($"Name: {name} Message: {s}"));
public async Task Messaging()
{
var coldObservable =
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0))
.Select(_ => GetData());
var publisher = coldObservable.Publish();
var subscriptions =
new CompositeDisposable(
Subscriber(publisher, "One"),
Subscriber(publisher, "Two"),
publisher.Connect());
await Task.Delay(TimeSpan.FromSeconds(5.0));
subscriptions.Dispose();
}
}
Better yet, though, I would look at doing it this way:
public class UnitTest1
{
private string GetData() => "Hi";
private IObservable<string> Subscriber(IObservable<string> observable, string name) =>
observable.Select(s => $"Name: {name} Message: {s}");
public async Task Messaging()
{
var coldObservable =
Observable
.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0))
.Select(_ => GetData())
.Do(_ => Debug.WriteLine("Called GetData()"))
.Publish(published =>
Observable
.Merge(
Subscriber(published, "One"),
Subscriber(published, "Two")))
.TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5.0)))
.Do(x => Debug.WriteLine(x));
await coldObservable;
}
}
It's always best to use the inbuilt operators for Rx rather than hybrid approaches with tasks.
Thanks to the answer above, I eventually got the desired result without having to implement IObservable
. Theodor was correct. The answer was to convert the IObservable
to hot with the Publish()
method.
I wrote an article about this here
While this works, Enigmativity's answer above is far better.
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Observables
{
class Subscriber
{
public string Name;
//Listen for OnNext and write to the debug window when it happens
public Subscriber(IObservable<string> observable, string name)
{
Name = name;
observable.Subscribe(s => Debug.WriteLine($"Name: {Name} Message: {s}"));
}
}
[TestClass]
public class UnitTest1
{
static string GetData() => "Hi";
[TestMethod]
public async Task Messaging()
{
var cancellationSource = new CancellationTokenSource();
var cancellationToken = cancellationSource.Token;
var coldObservable = Observable.Create<string>(observer =>
{
_ = Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
var data = GetData();
observer.OnNext(data);
await Task.Delay(1000);
}
}, cancellationToken);
return Disposable.Empty;
});
var publisher = coldObservable.Publish();
var connection = publisher.Connect();
new Subscriber(publisher, "One");
new Subscriber(publisher, "Two");
for (var i = 0; i < 5; i++)
{
if (i == 4)
{
cancellationSource.Cancel();
}
await Task.Delay(1000);
}
connection.Dispose();
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With