Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combine`s subscribe(on:options:) operator

Tags:

swift

frp

combine

I have a question about the subscribe(on:options:) operator. I would appreciate if anyone can help me to figure it out.

So what we have from the documentation:

Specifies the scheduler on which to perform subscribe, cancel, and request operations. In contrast with receive(on:options:), which affects downstream messages, subscribe(on:options:) changes the execution context of upstream messages.

Also, what I got from different articles is that unless we explicitly specify the Scheduler to receive our downstream messages on (using receive(on:options:)), messages will be send on the Scheduler used for receiving a subscription.

This information is not aligned with what I am actually getting during the execution.

I have the next code:

Just("Some text")
    .map { _ in
        print("Map: \(Thread.isMainThread)")
    }
    .subscribe(on: DispatchQueue.global())
    .sink { _ in
        print("Sink: \(Thread.isMainThread)")
    }
    .store(in: &subscriptions)

I would expect next output:

Map: false
Sink: false

But instead I am getting:

Map: true
Sink: false

The same thing happens when I use Sequence publisher.

If I swap the position of map operator and subscribe operator, I receive what I want:

Just("Some text")
    .subscribe(on: DispatchQueue.global())
    .map { _ in
        print("Map: \(Thread.isMainThread)")
    }
    .sink { _ in
        print("Sink: \(Thread.isMainThread)")
    }
    .store(in: &subscriptions)

Output:

Map: false
Sink: false

Interesting fact is that when I use the same order of operators from my first listing with my custom publisher, I receive the behaviour I want:

struct TestJust<Output>: Publisher {
    typealias Failure = Never
    
    private let value: Output
    
    init(_ output: Output) {
        self.value = output
    }
    
    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
        subscriber.receive(subscription: Subscriptions.empty)
        _ = subscriber.receive(value)
        subscriber.receive(completion: .finished)
    }
}

TestJust("Some text")
    .map { _ in
        print("Map: \(Thread.isMainThread)")
    }
    .subscribe(on: DispatchQueue.global())
    .sink { _ in
        print("Sink: \(Thread.isMainThread)")
    }
    .store(in: &subscriptions)

Output:

Map: false
Sink: false

So I think there is either my total misunderstanding of all these mechanisms, or some publishers intentionally choose the thread to publish values (Just, Sequence -> Main, URLSession.DataTaskPublisher -> Some of Background), which does not make sense for me, cause in this case why would we need this subscribe(on:options:) for.

Could you please help me to understand what am I missing? Thank you in advance.

like image 617
Oleg Kosenko Avatar asked Jan 22 '21 18:01

Oleg Kosenko


1 Answers

The first thing to understand is that messages flow both up a pipeline and down a pipeline. Messages that flow up a pipeline ("upstream") are:

  • The actual performance of the subscription (receive subscription)

  • Requests from a subscriber to the upstream publisher asking for a new value

  • Cancel messages (these percolate upwards from the final subscriber)

Messages that flow down a pipeline ("downstream") are:

  • Values

  • Completions, consisting of either a failure (error) or completion-in-good-order (reporting that the publisher emitted its last value)

Okay, well, as the documentation clearly states, subscribe(on:) is about the former: messages that flow upstream. But you are not actually tracking any of those messages in your tests, so none of your results reflect any information about them! Insert an appropriate handleEvents operator above the subscription point to see stuff flow upwards up the pipeline (e.g. implement its receiveRequest: parameter):

Just("Some text")
    .handleEvents(receiveRequest: {
        _ in print("Handle1: \(Thread.isMainThread)")
    })
    .map // etc.

Meanwhile, you should make no assumptions about the thread on which messages will flow downstream (i.e. values and completions). You say:

Also, what I got from different articles is that unless we explicitly specify the Scheduler to receive our downstream messages on (using receive(on:options:)), messages will be send on the Scheduler used for receiving a subscription.

But that seems like a bogus assumption. And nothing about your code determines the downstream-sending thread in a clear way. As you rightly say, you can take control of this with receive(on:), but if you don't, I would say you must assume nothing about the matter. Some publishers certainly do produce a value on a background thread, such as the data task publisher, which makes perfect sense (the same thing happens with a data task completion handler). Others don't.

What you can assume is that operators other than receive(on:) will not generally alter the value-passing thread. But whether and how an operator will use the subscription thread to determine the receive thread, that is something you should assume nothing about. To take control of the receive thread, take control of it! Call receive(on:) or assume nothing.

Just to give an example, if you change your opening to

Just("Some text")
    .receive(on: DispatchQueue.main)

then both your map and your sink will report that they are receiving values on the main thread. Why? Because you took control of the receive thread. This works regardless of what you may say in any subscribe(on:) commands. They are different matters entirely.

Maybe if you call subscribe(on:) but you don't call receive(on:), some things about the downstream-sending thread are determined by the subscribe(on:) thread, but I sure wouldn't rely on there being any hard and fast rules about it; there's nothing saying that in the documentation! Instead, don't do that. If you implement subscribe(on:), implement receive(on:) too so that you are in charge of what happens.

like image 83
matt Avatar answered Nov 12 '22 14:11

matt