I want to do an asynchronous job with Future. But the below .sink() closures never get called. It seems that the instance of Future was released right after it was called.
Future<Int, Never> { promise in
DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
promise(.success(1))
}
}
.receive(on: DispatchQueue.main)
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: {
print($0)
})
So I replaced .sink() closures to .subscribe(Subscribers.Sink()) like below. It works fine. But the problem is I don't understand why it works fine. :( It looks the same to me. What is the difference between these two codes? And when can I use .sink(), and when can I not?
Future<Int, Never> { promise in
DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
promise(.success(1))
}
}
.receive(on: DispatchQueue.main)
.subscribe(Subscribers.Sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: {
print($0)
}))
Thanks in advance.
the terms 'source' and 'sink' are commonly used to refer to locations where whatever is flowing (respectively) starts out / is produced and ends up / is consumed. So heat flows from its source toward a heat sink, water flows down a (kitchen?) sink, and Combine reactive streams flow toward a .
Once we've created a publisher, we can then attach subscriptions to it, for example by using the sink API — which lets us pass a closure to be called whenever a new value was received, as well as one that'll be called once the publisher was completed: let cancellable = publisher.
If no one holds on to that return-value (by assigning or storing), the reference-count is not incremented (from 0) and the cancellable is released.
The .sink
operator does three things:
Subscribers.Sink
using the two closures you pass it.subscribe
on the upstream Publisher
, passing the Sink
it created.AnyCancellable
that, when destroyed, cancels the Sink
. It returns a reference to this AnyCancellable
.AnyCancellable
is a reference-counted object. When the last reference to the AnyCancellable
is destroyed, the AnyCancellable
itself is destroyed. At that time, it calls its own cancel
method.
In your first example, you are not saving the AnyCancellable
returned by .sink
. So Swift destroys it immediately, which means it cancels the subscription immediately. One second later, your asyncAfter
closure calls promise
, but the subscription has already been cancelled, so your receiveValue
closure is not called.
In your second example, since you are creating the Subscribers.Sink
object and passing it to subscribe
yourself, no AnyCancellable
is created to wrap the Sink
. So nothing automatically destroys the subscription. One second later, the asyncAfter
closure calls promise
. Since the subscription wasn't destroyed, it still exists, so your receiveValue
closure is called, and then your receiveCompletion
closure is called.
So this is actually a very interesting use of Subscribers.Sink
instead of the .sink
operator. With .sink
, you must save the returned AnyCancellable
, else the subscription is cancelled immediately. But by using Subscribers.Sink
directly, you create a subscription that lasts until it is completed, and you don't have to save anything. And when the subscription completes (with either .finished
or .failure
), the Sink
discards the Subscription
, which breaks the retain cycle that was keeping it alive, so the Sink
and the Subscription
are also destroyed, leaving no memory leaks.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With