Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Measuring response time using observables

Given Observable A representing incoming requests ...

interface IRequest
{
  string Id { get; }
}

IObservable<IRequest> requests;

and Observable B representing responses ...

IObservable<string> responses;

I need to measure the time it takes to produce a response. My current solution looks like below. This works but I'm not sure if this could be simplified or implemented more streamlined. I'm also concerned about the deviation between actual response time ([GEN]) and observed response time ([OBS]) which becomes quiet significant if generatedResponseTime is set to sub 50ms.

class Program
{
    public class Request
    {
        public string Id { get; set; }
    }

    static void Main(string[] args)
    {
        var generatedResponseTime = 100;

        var responseSub = new Subject<string>();

        IObservable<Request> requests = Observable.Interval(TimeSpan.FromMilliseconds(1000))
            .Select(x=> new Request { Id = x.ToString("d") });

        IObservable<string> responses = responseSub.AsObservable();

        var responseTime = requests
            .Where(x => !string.IsNullOrEmpty(x.Id))
            .Select(req => new { Id = req.Id, Start = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() })
            .SelectMany(req => responses
                .Where(reqId => reqId == req.Id)
                .Select(_ => new { RequestId = req.Id, ResponseTime = (int)(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() - req.Start) })
                .Take(1))
            .Publish()
            .RefCount();

        requests.ObserveOn(TaskPoolScheduler.Default)
            .Subscribe(req =>
            {
                Console.WriteLine($"[GEN] {req.Id} = {generatedResponseTime} ms");

                Thread.Sleep(generatedResponseTime);
                responseSub.OnNext(req.Id);
            });

        responseTime.Subscribe(x => Console.WriteLine($"[OBS] {x.RequestId} = {x.ResponseTime} ms"));

        Console.ReadLine();
    }
}
like image 248
Oliver Weichhold Avatar asked Dec 09 '25 11:12

Oliver Weichhold


1 Answers

Here's an alternative that uses some built-in operators. Also doesn't have the multiple subscription problems in your solution:

var responseTime2 = requests
    .Timestamp()
    .Join(responses.Timestamp(),
        request => Observable.Never<Unit>(), //timeout criteria
        response => Observable.Empty<Unit>(),
        (request, response) => (request: request, response: response)
    )
    .Where(t => t.request.Value.Id == t.response.Value)
    .Select(t => t.response.Timestamp.ToUnixTimeMilliseconds() - t.request.Timestamp.ToUnixTimeMilliseconds())

If you had any sort of timeout criteria on the request, then it would be helpful from a performance perspective to put that where the comment indicates. Otherwise the performance of the Join/Where combination will worsen over time.

like image 190
Shlomo Avatar answered Dec 12 '25 07:12

Shlomo



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!