Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to conditionally buffer RACSignal values?

I'm working on some code that interacts with a remote API via websockets. My data layer is responsible for establishing and monitoring the websocket connection. It also contains methods that can be used by the application to enqueue websocket messages to be sent. The application code should not be responsible for inspecting the state of the websocket connection, aka fire-and-forget.

Ideally, I'd like to data layer to function as follows:

  • When the data layer does not have a connection to the websocket endpoint (self.isConnected == NO), messages are buffered internally.
  • When a connection is becomes available (self.isConnected == YES), buffered messages are immediately sent, and any subsequent messages are sent immediately.

Here's what I've been able to come up with:

#import "RACSignal+Buffering.h"

@implementation RACSignal (Buffering)

- (RACSignal*)bufferWithSignal:(RACSignal*)shouldBuffer
{
    return [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {

        RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

        NSMutableArray* bufferedValues = [[NSMutableArray alloc] init];
        __block BOOL buffering = NO;

        void (^bufferHandler)() = ^{
            if (!buffering)
            {
                for (id val in bufferedValues)
                {
                    [subscriber sendNext:val];
                }

                [bufferedValues removeAllObjects];
            }
        };

        RACDisposable* bufferDisposable = [shouldBuffer subscribeNext:^(NSNumber* shouldBuffer) {

            buffering = shouldBuffer.boolValue;
            bufferHandler();

        }];

        if (bufferDisposable)
        {
            [disposable addDisposable:bufferDisposable];
        }

        RACDisposable* valueDisposable = [self subscribeNext:^(id x) {

            [bufferedValues addObject:x];
            bufferHandler();

        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            [subscriber sendCompleted];
        }];

        if (valueDisposable)
        {
            [disposable addDisposable:valueDisposable];
        }

        return disposable;
    }];
}

@end

Lastly, this is pseudo-code for how it would be used:

@interface APIManager ()

@property (nonatomic) RACSubject* requests;

@end

@implementation WebsocketDataLayer

- (id)init
{
    self = [super init];

    if (self) {

        RACSignal* connectedSignal = RACObserve(self, connected);

        self.requests = [[RACSubject alloc] init];

        RACSignal* bufferedApiRequests = [self.requests bufferWithSignal:connectedSignal];

        [self rac_liftSelector:@selector(sendRequest:) withSignalsFromArray:@[bufferedApiRequests]];
    }
    return self;
}

- (void)enqueueRequest:(NSString*)request
{
    [self.requests sendNext:request];
}

- (void)sendRequest:(NSString*)request
{
    DebugLog(@"Making websocket request: %@", request);
}

@end

My question is: Is this the right approach for buffering values? Is there a more idiomatic RAC way of handling this?

like image 723
Matt Hupman Avatar asked Oct 22 '13 18:10

Matt Hupman


1 Answers

Buffering can be thought of as something that applies to individual requests, which leads to a natural implementation using -flattenMap: and RACObserve:

@weakify(self);
RACSignal *bufferedRequests = [self.requests flattenMap:^(NSString *request) {
    @strongify(self);

    // Waits for self.connected to be YES, or checks that it already is,
    // then forwards the request.
    return [[[[RACObserve(self, connected)
        ignore:@NO]
        take:1]
        // Replace the property value with our request.
        mapReplace:request];
}];

If ordering is important, you can replace -flattenMap: with -map: plus -concat. These implementations avoid the need for any custom operators, and work without manual subscriptions (which are notoriously messy).

like image 161
Justin Spahr-Summers Avatar answered Oct 10 '22 00:10

Justin Spahr-Summers