Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to deal with concurrency issues brought by NSStream run loop scheduling using GCD?

I have the following situation where I create a GCD dispatch queue and in it I schedule an NSStream to the current NSRunLoop, as is required in its specification for it to emit delegate events, and then I make the run loop for that thread run using [[NSRunLoop currentRunLoop run].

This generates three possible scenarios:

  1. Create a serial queue in which an initial write message is sent through the stream and other write messages are only sent when there's a delegate callback from the NSStream object, as attempting to write new messages without respecting this pattern (this would be desirable) will fail as the queue is locked by the run loop running.

  2. Create a concurrent queue in which messages can be written to the stream freely, as blocks sent to the queue will be executed concurrently with the block that's running the run loop. However, while it is desirable to make writing messages and the run loop running concurrent, it certainly is not desirable to have to blocks in the queue running concurrently attempting to write at the same time to the stream.

  3. Create two queues -- one responsible for keeping the run loop alive and receive read-from-stream callbacks and another one for sending asynchronous write messages to the stream. This would seem ideal, however it seems that the NSStream documentation specifically states that one should not attempt to read/write to a stream outside the thread it is scheduled in.

Given these scenarios none of which are ideal, how to solve these problems?

like image 542
Matoe Avatar asked Jul 09 '15 01:07

Matoe


2 Answers

Late to the party, but instead of using runloops you can set the desired dispatch queue for your streams directly using

void CFReadStreamSetDispatchQueue(CFReadStreamRef stream, dispatch_queue_t q);
void CFWriteStreamSetDispatchQueue(CFWriteStreamRef stream, dispatch_queue_t q);

Where CFReadStreamRef can take a bridged NSInputStream and CFWriteStreamRef a bridged NSOutputStream. This way you don't have to schedule or unschedule runloops at all and your streams will run in the background.

Snippet from this Apple sample code:

CFReadStreamSetDispatchQueue((__bridge CFReadStreamRef) self.inputStream,  self.queue);
CFWriteStreamSetDispatchQueue((__bridge CFWriteStreamRef) self.outputStream, self.queue);

In Swift, you can just directly call the functions:

CFReadStreamSetDispatchQueue(inputStream, streamQueue)
CFWriteStreamSetDispatchQueue(outputStream, streamQueue)
like image 115
Mux Avatar answered Nov 18 '22 02:11

Mux


As you noted from the docs, when you have a run-loop-based API like NSStream, the general expectation is that all interaction with that object will occur on the thread that owns the run loop on which it's scheduled. I'm not sure there's really any benefit to mixing these two idioms (GCD and run loops) when it comes to working with NSStream.

Other than the main queue, GCD has no concept of thread-affinity, so unless the run loop you schedule the NSStream on happens to be the main thread run loop, there's no good way to use dispatch_async to schedule blocks for execution on that thread.

At the risk of stating the obvious, you should probably just use the standard methods for scheduling methods on other threads. -performSelector:onThread:withObject:waitUntilDone:modes: is the most obvious. If your confusion is that you want to work with blocks, it helps to know that heap-allocated blocks can be treated like Objective-C objects and implement the -invoke selector just like NSInvocations do. A trivial example relevant to your question might look like this:

@interface AppDelegate ()
{
    NSThread* bgthread;
}
@end

@implementation AppDelegate

- (void)applicationDidFinishLaunching:(NSNotification *)aNotification
{
    // Basic loop to get the background thread to run until you call -cancel on it
    dispatch_block_t threadMain = [^{
        NSThread* thread = [NSThread currentThread];
        NSParameterAssert(![thread isMainThread]);

        NSRunLoop* currentRunLoop = [NSRunLoop currentRunLoop];
        NSPort* port = [NSPort port];

        // If we dont register a mach port with the run loop, it will just exit immediately
        [currentRunLoop addPort: port forMode: NSRunLoopCommonModes];

        // Loop until the thread is cancelled.
        while (!thread.cancelled)
        {
            [currentRunLoop runMode: NSDefaultRunLoopMode beforeDate: [NSDate distantFuture]];
        }

        [currentRunLoop removePort: port forMode: NSRunLoopCommonModes];

        [port invalidate];
        port = nil;
    } copy];

    // Start the thread
    bgthread = [[NSThread alloc] initWithTarget: threadMain selector: @selector(invoke) object: nil];
    [bgthread start];

    // Fetch the runloop, so you can schedule an NSStream on it...
    __block NSRunLoop* runloopForStream = nil;
    dispatch_block_t getrunloop = [^{
        runloopForStream = [NSRunLoop currentRunLoop];
    } copy];

    // Dispatch synchronously, so that runloopForStream is populated before we continue...
    [getrunloop performSelector: @selector(invoke) onThread: bgthread withObject: nil waitUntilDone: YES];

    // Schedule your stream, etc.
    NSOutputStream* mystream = ...; // Your code here...
    [mystream scheduleInRunLoop: runloopForStream forMode: NSDefaultRunLoopMode];

    // Then later, when you want to write some data...
    NSData* dataToWrite = [NSMutableData dataWithLength: 100];
    dispatch_block_t doWrite = [^{
        [mystream write: dataToWrite.bytes maxLength: dataToWrite.length];
    } copy];

    // Dispatch asynchronously to thread
    [doWrite performSelector: @selector(invoke) onThread: bgthread withObject: nil waitUntilDone: NO];
}
@end

Note that the -copy of the blocks is necessary to get them copied to the heap, otherwise they'll be deallocated when the declaring method goes out of scope.

like image 3
ipmcc Avatar answered Nov 18 '22 01:11

ipmcc