I'm working on some code that interacts with a remote API via websockets. My data layer is responsible for establishing and monitoring the websocket connection. It also contains methods that can be used by the application to enqueue websocket messages to be sent. The application code should not be responsible for inspecting the state of the websocket connection, aka fire-and-forget.
Ideally, I'd like to data layer to function as follows:
self.isConnected == NO
), messages are buffered internally.self.isConnected == YES
), buffered messages are immediately sent, and any subsequent messages are sent immediately.Here's what I've been able to come up with:
#import "RACSignal+Buffering.h"
@implementation RACSignal (Buffering)
- (RACSignal*)bufferWithSignal:(RACSignal*)shouldBuffer
{
return [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
NSMutableArray* bufferedValues = [[NSMutableArray alloc] init];
__block BOOL buffering = NO;
void (^bufferHandler)() = ^{
if (!buffering)
{
for (id val in bufferedValues)
{
[subscriber sendNext:val];
}
[bufferedValues removeAllObjects];
}
};
RACDisposable* bufferDisposable = [shouldBuffer subscribeNext:^(NSNumber* shouldBuffer) {
buffering = shouldBuffer.boolValue;
bufferHandler();
}];
if (bufferDisposable)
{
[disposable addDisposable:bufferDisposable];
}
RACDisposable* valueDisposable = [self subscribeNext:^(id x) {
[bufferedValues addObject:x];
bufferHandler();
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
[subscriber sendCompleted];
}];
if (valueDisposable)
{
[disposable addDisposable:valueDisposable];
}
return disposable;
}];
}
@end
Lastly, this is pseudo-code for how it would be used:
@interface APIManager ()
@property (nonatomic) RACSubject* requests;
@end
@implementation WebsocketDataLayer
- (id)init
{
self = [super init];
if (self) {
RACSignal* connectedSignal = RACObserve(self, connected);
self.requests = [[RACSubject alloc] init];
RACSignal* bufferedApiRequests = [self.requests bufferWithSignal:connectedSignal];
[self rac_liftSelector:@selector(sendRequest:) withSignalsFromArray:@[bufferedApiRequests]];
}
return self;
}
- (void)enqueueRequest:(NSString*)request
{
[self.requests sendNext:request];
}
- (void)sendRequest:(NSString*)request
{
DebugLog(@"Making websocket request: %@", request);
}
@end
My question is: Is this the right approach for buffering values? Is there a more idiomatic RAC way of handling this?
Buffering can be thought of as something that applies to individual requests, which leads to a natural implementation using -flattenMap:
and RACObserve
:
@weakify(self);
RACSignal *bufferedRequests = [self.requests flattenMap:^(NSString *request) {
@strongify(self);
// Waits for self.connected to be YES, or checks that it already is,
// then forwards the request.
return [[[[RACObserve(self, connected)
ignore:@NO]
take:1]
// Replace the property value with our request.
mapReplace:request];
}];
If ordering is important, you can replace -flattenMap:
with -map:
plus -concat
. These implementations avoid the need for any custom operators, and work without manual subscriptions (which are notoriously messy).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With