Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there an operator that works like Scan but let's me return an IObservable<TResult> instead of IObservable<TSource>?

In this made up example purely for practice, here is what I want to return:

If two students join a school within a specified period of time, say, 2 seconds, then I want a data structure returning both the students, the school they joined and the time interval between their joining.

I have been thinking along these lines:

class Program
{
    static void Main(string[] args)
    {
        ObserveStudentsJoiningWithin(TimeSpan.FromSeconds(2));
    }

    static void ObserveStudentsJoiningWithin(TimeSpan timeSpan)
    {
        var school = new School("School 1");

        var admissionObservable =
            Observable.FromEventPattern<StudentAdmittedEventArgs>(school, "StudentAdmitted");

        var observable = admissionObservable.TimeInterval()
            .Scan((current, next) =>
            {
                if (next.Interval - current.Interval <= timeSpan)
                {
                    // But this won't work for me because
                    // this requires me to return a TSource
                    // and not a TResult
                }
            });

        var subscription = observable.Subscribe(TimeIntervalValueHandler);

        school.FillWithStudentsAsync(10, TimeSpan.FromSeconds(3));
        school.FillWithStudentsAsync(8, TimeSpan.FromSeconds(1));

        Console.WriteLine("Press any key to exit the program");
        Console.ReadKey();
        subscription.Dispose();
    }
}

And here is the domain:

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace SchoolManagementSystem
{
    public class Student
    {
        private static int _studentNumber; 

        public Student(string name)
        {
            Name = name;
        }

        public string Name { get; set; }

        public static Student CreateRandom()
        {
            var name = string.Format($"Student {++_studentNumber}");

            return new Student(name);
        }

        public override string ToString()
        {
            return Name;
        }
    }

    public class School: IEnumerable<Student>
    {
        private List<Student> _students;

        public event StudentAdmitted StudentAdmitted;

        public string Name { get; set; }

        public School(string name)
        {
            Name = name;
            _students = new List<Student>();
        }

        public void AdmitStudent(Student student)
        {
            if (!_students.Contains(student))
            {
                _students.Add(student);

                OnStudentAdmitted(this, student);
            }
        }

        protected virtual void OnStudentAdmitted(School school, Student student)
        {
            var args = new StudentAdmittedEventArgs(school, student);

            StudentAdmitted?.Invoke(this, args);
        }

        public IEnumerator<Student> GetEnumerator()
        {
            return _students.GetEnumerator();
        }

        IEnumerator IEnumerable.GetEnumerator()
        {
            return GetEnumerator();
        }
    }


    public delegate void StudentAdmitted(object sender, StudentAdmittedEventArgs args);

    public class StudentAdmittedEventArgs : EventArgs
    {
        public StudentAdmittedEventArgs(School school, Student student): base()
        {
            School = school;
            Student = student;
        }

        public School School { get; protected set; }
        public Student Student { get; protected set;  }
    }

    public static class Extensions
    {
        public async static void FillWithStudentsAsync(this School school, int howMany, TimeSpan gapBetweenEachAdmission)
        {
            if (school == null)
                throw new ArgumentNullException("school");

            if (howMany < 0)
                throw new ArgumentOutOfRangeException("howMany");

            if (howMany == 1)
            {
                school.AdmitStudent(Student.CreateRandom());
                return;
            }

            for (int i = 0; i < howMany; i++)
            {
                await Task.Delay(gapBetweenEachAdmission);

                school.AdmitStudent(Student.CreateRandom());
            }
        }
    }
}

However, the Scan operator allows me to only return an observable of the same TSource. Select also won't work here because I cannot look ahead (something that I can do with Scan) and project the current item along with the next one, even though Select allows me to transform TSource into TResult.

I am looking for something in between.

like image 871
Water Cooler v2 Avatar asked Oct 29 '22 23:10

Water Cooler v2


2 Answers

  1. For a pairwise comparison (original - project the current item along with the next one) you can use a Buffer method to build a sequence with pairs.
  2. In order to find out an interval between students joinings using the Timestamp instead of TimeInterval method could be more useful because of the following line next.Interval - current.Interval <= timeSpan. What you really want is something like pair[1].Timestamp - pair[0].Timestamp <= timeSpan

Following results in 4 pairs (Student 11, Student 12), (Student 13, Student 14), (Student 15, Student 16), (Student 17, Student 18):

var admissionObservable = Observable
        .FromEventPattern<StudentAdmittedEventArgs>(school, "StudentAdmitted")
        .Timestamp()
        .Buffer(2)
        .Where(pair => pair[1].Timestamp - pair[0].Timestamp <= timeSpan)
        .Select(pair => new JoiningData
        {
            Students = Tuple.Create(pair[0].Value.EventArgs.Student, pair[1].Value.EventArgs.Student),
            School = pair[0].Value.EventArgs.School,
            Interval = pair[1].Timestamp - pair[0].Timestamp
        });
  1. As @Enigmativity mentioned it would be better to compare each element with the next one. So for that purpose we can use a Zip method:

Following results in 8 pairs (Student 10, Student 11) (Student 11, Student 12), (Student 12, Student 13), (Student 13, Student 14), (Student 14, Student 15), (Student 15, Student 16), (Student 16, Student 17), (Student 17, Student 18):

var admissionObservable = Observable
     .FromEventPattern<StudentAdmittedEventArgs>(school, "StudentAdmitted")
     .Timestamp();        

admissionObservable
    .Zip(admissionObservable.Skip(1), (a, b) => Tuple.Create(a,b))        
    .Where(pair => pair.Item2.Timestamp - pair.Item1.Timestamp <= timeSpan)        
    .Select(pair => new JoiningData
    {
        Students = Tuple.Create(pair.Item1.Value.EventArgs.Student, pair.Item2.Value.EventArgs.Student),
        School = pair.Item1.Value.EventArgs.School,
        Interval = pair.Item2.Timestamp - pair.Item1.Timestamp
    });
like image 123
Andriy Tolstoy Avatar answered Nov 15 '22 04:11

Andriy Tolstoy


Can you try this and see if it gives you what you want?

IObservable<EventPattern<StudentAdmittedEventArgs>[]> observable =
    admissionObservable
        .Publish(pxs =>
            pxs
                .Window(pxs, x => Observable.Timer(timeSpan))
                .Select(ys => ys.Take(2)))
        .SelectMany(ys => ys.ToArray())
        .Where(ys => ys.Skip(1).Any());
like image 42
Enigmativity Avatar answered Nov 15 '22 04:11

Enigmativity