I have this publisher and subscribers (example code):
import Combine
let publisher = PassthroughSubject<ComplexStructOrClass, Never>()
let sub1 = publisher.sink { (someString) in
// Async work...
}
let sub2 = publisher.sink { (someString) in
// Async work, but it has to wait until sub1 has finished his work
}
So the publisher
constant has 2 subscribers. When I use the method send
on the publisher
constant, it should send the value first to sub1
and after sub1
finished processing (with a callback or something like that), publisher
should and notify sub2
.
So in the comments its stated that Combine is made for this. What publisher do I need to use? A PassthroughSubject may be the wrong decision.
Usecase
I need to publish values throughout the lifetime of my app to a dynamic number of subscribers, for a few different publishers (I hope I can make a protocol). So a subscriber can be added and removed from a publisher at any given time. A subscriber look as follows:
NSPersistentContainer
A callback should be made by the publisher when a new value has arrived. That process looks like:
Current code, no Combine
This is some code without using Combine
:
// My publisher
protocol NotiPublisher {
// Type of message to send
associatedtype Notification
// List of subscribers for this publisher
static var listeners: Set<AnyNotiPublisher<Notification>> { get set }
}
// My subscriber
protocol NotificationListener: Hashable {
associatedtype NotificationType
var container: NSPersistentContainer { get }
// Identifier used to find this subscriber in the list of 'listeners' in the publisher
var identifier: Int32 { get }
var notify: ((_ notification: NotificationType, _ context: NSManagedObjectContext, @escaping CompletionHandlerAck) -> ()) { get }
}
// Type erased version of the NotificationListener and some convience methods here, can add them if desired
// In a extension of NotiPublisher, this method is here
static func notify(queue: DispatchQueue, notification: Notification, completionHander: @escaping CompletionHandlerAck) throws {
let dispatchGroup = DispatchGroup()
var completionBlocks = [SomeCompletionHandler]()
var contexts = [NSManagedObjectContext]()
var didLoop = false
for listener in listeners {
if didLoop {
dispatchGroup.wait()
} else {
didLoop = true
}
dispatchGroup.enter()
listener.container.performBackgroundTask { (context) in
contexts.append(context)
listener.notify(notification, context, { (completion) in
completionBlocks.append(completion)
dispatchGroup.leave()
})
}
}
dispatchGroup.notify(queue: queue) {
let err = completion.first(where: { element in
// Check if an error has occured
})
if err == nil {
for context in contexts {
context.performAndWait {
try! context.save()
}
}
}
completionHander(err ?? .ok(true))
}
}
This is pretty complex code, I am wondering if I can make use of the power of Combine to make this code more readable.
I wrote the following to chain async operations from a publisher using flatMap that allows you to return another publisher. I'm not a fan, and it might not meet your need to dynamically change the subs, but it might help someone:
let somePublisher = Just(12)
let anyCancellable = somePublisher.flatMap{ num in
//We can return a publisher from flatMap, so lets return a Future one because we want to do some async work
return Future<Int,Never>({ promise in
//do an async thing using dispatch
DispatchQueue.main.asyncAfter(deadline: .now() + 3, execute: {
print("flat map finished processing the number \(num)")
//now just pass on the value
promise(.success(num))
})
})
}.flatMap{ num in
//We can return a publisher from flatMap, so lets return a Future one because we want to do some async work
return Future<Int,Never>({ promise in
//do an async thing using dispatch
DispatchQueue.main.asyncAfter(deadline: .now() + 3, execute: {
print("second flat map finished processing the number \(num)")
//now just pass on the value
promise(.success(num))
})
})
}.sink { num in
print("This sink runs after the async work in the flatMap/Future publishers")
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With