Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Implementing Observables into a persistent queue library

Currently writing a little persistent queue library that will read/write lines to a text file. Here is the add method, for example:

Queue.prototype.add = function(line, cb){

    getLock(this, err => {
        if(err){
            this.emit('error', err);
            releaseLock(err, cb);
        }
        else{
            fs.appendFile(this.filepath, line, err => {
               err && this.emit('error', err);
               releaseLock(err, cb);
            });
        }
    });
};

what I find quite awkward, is supporting event emitters and callbacks (or event emitters and promises).

In other words, for each method (add, peek, remove) on the queue, I need to return/callback a result that's specific to each call. Using an event emitter only means that the caller might act on a result that was not specific to the call they just made. So callbacks or promises here seem imperative - you can't use event emitters only.

What I am wondering is - can observables somehow solve the problem of having to pair callbacks with event emitters or promises with event emitters?

I am looking to find a way to implement this evented/asynchronous queue with only one type asynchronous callback mechanism. Maybe observables aren't the answer here, but I am looking for a good design pattern nonetheless.

like image 526
Alexander Mills Avatar asked Dec 16 '16 05:12

Alexander Mills


1 Answers

I'm not quite sure why do you need event emitters here.... If you use observables each subscriber will get the results/errors from their own call.

I would rewrite your method as such:

function appendFileObs(filePath, line){
    return Rx.Observable.create((obs) => {
        fs.appendFile(filePath, line, (err, result) => {
            if(err) obs.onError(err);
            else {
                obs.onNext(result);
                obs.onCompleted();
            }
        });
    });
});
// Similar for getLock and releaseLock


Queue.prototype.add = function(line){
    return getLockObs(this)
        .flatMap(() => appendFileObs(this.filePath, line))
        .flatMap(result => releaseLockObs(undefined).map(() => result))
        .catch((err) => {
            return releaseLockObs(err);
        });
};

On this solution I'm not proud that the stream has side effects inside, it's probably improvable but you get the idea.

This way, when someone calls .add(line).subscribe() it will get the result and the errors that happened on his call.

If you need to broadcast the errors that happen, you can use a BehaviourSubject, that's an observer and observable at the same time (useful stuff!)

like image 179
olivarra1 Avatar answered Oct 30 '22 21:10

olivarra1