Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

StackExchange.Redis Blocking pop design

So we had an existing Helper library that used ServiceStack.Redis and currently trying to swap it with StackExchange.Redis. We were using BlockingPop (BLPOP) but since StackExchange.Redis doesn't support it. we implemented it as follows

public static void Push(string Qname, string val)
{
    IDatabase db = redis.GetDatabase();
    db.ListLeftPush(Qname, val);
    ISubscriber sub = redis.GetSubscriber();
    sub.Publish(Qname + "_msg", "1");
}

and Pop with blocking option as follows:

    public static string Pop(string Qname, 
    bool block_until_available = false,int timeout_secs=0)
{   
    IDatabase db = redis.GetDatabase();            
    var popped = db.ListRightPop(Qname);
    if (popped.IsNull)
    {
        if (block_until_available == false)
            return null;
    }
    else
        return popped;

    //wait for an item to be pushed in.
    ISubscriber sub = redis.GetSubscriber();
    AutoResetEvent autoEvent = new AutoResetEvent(false);
    string obj = null;
    Task.Run(() =>
    {
        sub.Subscribe(Qname + "_msg", (channel, message) =>
        {
            popped = db.ListRightPop(Qname);
            if (!popped.IsNull)
            {
                obj = popped;
                sub.Unsubscribe(Qname + "_msg");
                autoEvent.Set();
            }
        });
    });
    if (timeout_secs > 0)
        autoEvent.WaitOne(timeout_secs * 1000);
    else
        autoEvent.WaitOne();
    return obj;
}

Do you all see any obvious issue with this approach.

Also, I quickly ran into following error. I have increased the syncTimeout. hopefully that will fix it?

System.TimeoutException: Timeout performing RPOP DL_PROD, 
inst: 0, mgr: ProcessReadQueue, err: never, queue: 0, qu: 0, qs: 0, qc: 0, 
wr: 0, wq: 0, in: 0, ar: 1, IOCP: (Busy=0,Free=1000,Min=8,Max=1000), 
WORKER: (Busy=2,Free=32765,Min=8,Max=32767), 
clientName: CD147RE1 at 
StackExchange.Redis.ConnectionMultiplexer.ExecuteSyncImpl[T]
(Message message, ResultProcessor`1 processor, ServerEndPoint server) 
like image 370
coderguy123 Avatar asked Oct 31 '22 08:10

coderguy123


1 Answers

There's no specific reason RPOP should timeout here, unless it is bandwidth related (huge payloads). That error looks unrelated to the question IMO. The approach is a big ugly but ... well, it might work. There's no need to use Task.Run here, though. One problem in the approach is that it won't work correctly concurrently, I think. It seems to unsubscribe all delegates for that channel / connection, rather than just the one. Self-unsubscribing is possible, but frankly I wonder if it is easier to just have a single auto-reset-event and a single subscription, and if a message unlocks the gate while you're waiting: great.


The general pattern of self-unsubscribing delegates is essentially:

YourDelegateType handler = null;
handler = (args) => {
    DoTheThing();
    UnSubscribe(handler);
};
Subscribe(handler);

This uses the joy of lexical captured variables to provide access to the delegate instance inside the delegate, which means the delegate can pass itself to an unsubscribe method. The pattern shown above should work for all delegate-based callback scenarios - including regulate events and things like SE.Redis pub/sub handlers.

like image 81
Marc Gravell Avatar answered Nov 08 '22 04:11

Marc Gravell