In this made up example purely for practice, here is what I want to return:
If two students join a school within a specified period of time, say, 2 seconds, then I want a data structure returning both the students, the school they joined and the time interval between their joining.
I have been thinking along these lines:
class Program
{
static void Main(string[] args)
{
ObserveStudentsJoiningWithin(TimeSpan.FromSeconds(2));
}
static void ObserveStudentsJoiningWithin(TimeSpan timeSpan)
{
var school = new School("School 1");
var admissionObservable =
Observable.FromEventPattern<StudentAdmittedEventArgs>(school, "StudentAdmitted");
var observable = admissionObservable.TimeInterval()
.Scan((current, next) =>
{
if (next.Interval - current.Interval <= timeSpan)
{
// But this won't work for me because
// this requires me to return a TSource
// and not a TResult
}
});
var subscription = observable.Subscribe(TimeIntervalValueHandler);
school.FillWithStudentsAsync(10, TimeSpan.FromSeconds(3));
school.FillWithStudentsAsync(8, TimeSpan.FromSeconds(1));
Console.WriteLine("Press any key to exit the program");
Console.ReadKey();
subscription.Dispose();
}
}
And here is the domain:
using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace SchoolManagementSystem
{
public class Student
{
private static int _studentNumber;
public Student(string name)
{
Name = name;
}
public string Name { get; set; }
public static Student CreateRandom()
{
var name = string.Format($"Student {++_studentNumber}");
return new Student(name);
}
public override string ToString()
{
return Name;
}
}
public class School: IEnumerable<Student>
{
private List<Student> _students;
public event StudentAdmitted StudentAdmitted;
public string Name { get; set; }
public School(string name)
{
Name = name;
_students = new List<Student>();
}
public void AdmitStudent(Student student)
{
if (!_students.Contains(student))
{
_students.Add(student);
OnStudentAdmitted(this, student);
}
}
protected virtual void OnStudentAdmitted(School school, Student student)
{
var args = new StudentAdmittedEventArgs(school, student);
StudentAdmitted?.Invoke(this, args);
}
public IEnumerator<Student> GetEnumerator()
{
return _students.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
public delegate void StudentAdmitted(object sender, StudentAdmittedEventArgs args);
public class StudentAdmittedEventArgs : EventArgs
{
public StudentAdmittedEventArgs(School school, Student student): base()
{
School = school;
Student = student;
}
public School School { get; protected set; }
public Student Student { get; protected set; }
}
public static class Extensions
{
public async static void FillWithStudentsAsync(this School school, int howMany, TimeSpan gapBetweenEachAdmission)
{
if (school == null)
throw new ArgumentNullException("school");
if (howMany < 0)
throw new ArgumentOutOfRangeException("howMany");
if (howMany == 1)
{
school.AdmitStudent(Student.CreateRandom());
return;
}
for (int i = 0; i < howMany; i++)
{
await Task.Delay(gapBetweenEachAdmission);
school.AdmitStudent(Student.CreateRandom());
}
}
}
}
However, the Scan
operator allows me to only return an observable of the same TSource
. Select
also won't work here because I cannot look ahead (something that I can do with Scan
) and project the current item along with the next one, even though Select
allows me to transform TSource
into TResult
.
I am looking for something in between.
next.Interval - current.Interval <= timeSpan
. What you really want is something like pair[1].Timestamp - pair[0].Timestamp <= timeSpan
Following results in 4 pairs (Student 11, Student 12), (Student 13, Student 14), (Student 15, Student 16), (Student 17, Student 18):
var admissionObservable = Observable
.FromEventPattern<StudentAdmittedEventArgs>(school, "StudentAdmitted")
.Timestamp()
.Buffer(2)
.Where(pair => pair[1].Timestamp - pair[0].Timestamp <= timeSpan)
.Select(pair => new JoiningData
{
Students = Tuple.Create(pair[0].Value.EventArgs.Student, pair[1].Value.EventArgs.Student),
School = pair[0].Value.EventArgs.School,
Interval = pair[1].Timestamp - pair[0].Timestamp
});
Following results in 8 pairs (Student 10, Student 11) (Student 11, Student 12), (Student 12, Student 13), (Student 13, Student 14), (Student 14, Student 15), (Student 15, Student 16), (Student 16, Student 17), (Student 17, Student 18):
var admissionObservable = Observable
.FromEventPattern<StudentAdmittedEventArgs>(school, "StudentAdmitted")
.Timestamp();
admissionObservable
.Zip(admissionObservable.Skip(1), (a, b) => Tuple.Create(a,b))
.Where(pair => pair.Item2.Timestamp - pair.Item1.Timestamp <= timeSpan)
.Select(pair => new JoiningData
{
Students = Tuple.Create(pair.Item1.Value.EventArgs.Student, pair.Item2.Value.EventArgs.Student),
School = pair.Item1.Value.EventArgs.School,
Interval = pair.Item2.Timestamp - pair.Item1.Timestamp
});
Can you try this and see if it gives you what you want?
IObservable<EventPattern<StudentAdmittedEventArgs>[]> observable =
admissionObservable
.Publish(pxs =>
pxs
.Window(pxs, x => Observable.Timer(timeSpan))
.Select(ys => ys.Take(2)))
.SelectMany(ys => ys.ToArray())
.Where(ys => ys.Skip(1).Any());
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With