Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combine: Publisher sometimes loses value and completes

I have a simple Deferred Publisher that reads data from disk and I display the data in a SwiftUI List, the Publisher works well most of the time, but sometimes it doesn't behave well, it just loses its value (which's an array of Model objects) and completes with finished message. I've tried a workaround mentioned here to use the buffer operator to keep the value in buffer because I believe the Combine's Publisher by design won't pass the data downstream if there is no demand requested by subscribers and hence dropping this data and completes, however using buffer didn't solve the issue.

The code I have:

enum FileError: Error {
    case someError
}

class ViewModel: ObservableObject {
    @Published var modelArray = [Model]()
    private var subscriptions = Set<AnyCancellable>()
    func readData() {
        DataSource()
            .readFromBundle(resource: "Sample", type: "json")
            .receive(on: DispatchQueue.main)
            .sink(receiveCompletion: { completion in
                print("Completion: \(completion)")
            }) { array in
                self.modelArray = array
        }.store(in: &subscriptions)
    }
}
struct ContentView: View {
    @ObservedObject var viewModel: ViewModel

    var body: some View {
        VStack {
            List(self.viewModel.modelArray) { model in
                Text("\(model.name)")
            }
        }
        .onAppear {
            self.viewModel.readData()
        }
    }
}

struct Model: Codable, Identifiable {
    var id: Int
    var name: String
}

class DataSource {
    private let readQueue = DispatchQueue(label: "ReadQueue", qos: .default, attributes: .concurrent)

    func readFromBundle (resource: String, type:String) -> AnyPublisher<[Model], FileError> {
            Deferred {
                 Future { promise in
                    guard let url = Bundle.main.url(forResource: "Sample", withExtension: "json"),
                      let data = try? Data(contentsOf: url),
                      let modelArray = try? JSONDecoder().decode([Model].self, from: data)
                      else {
                        promise(.failure(.someError))
                        return
                    }
                      promise(.success(modelArray))
                }
            }
           .receive(on: self.readQueue)
           .eraseToAnyPublisher()
        }
}

This is a link to download a working sample project.

EDIT:

Environment: Xcode 11.3.1, iOS 13.3 iPhone 11 Pro Max simulator and device.

gif screenshot (notice the console output)

enter image description here

EDIT2:

if I add any downstream publishers, like combineLatest for example just before sink in the consumer function readData() then a new behavior introduced, which's chaining an async publisher (readFromBundle) with a sync publisher (combineLatest) will result in the value will not deliver at all on iOS 13.3+ devices and will sometimes deliver on devices below iOS 13.3, as stated on this link.

like image 732
JAHelia Avatar asked Nov 25 '25 05:11

JAHelia


2 Answers

It looks like racing-kind issue, please try the following (just by code-reading)

1) use background queue explicitly

private let readQueue = DispatchQueue(label: "ReadQueue", qos: .background, 
    attributes: .concurrent)

2) schedule Publisher on this queue instead of receiving on it

.subscribe(on: self.readQueue)
like image 103
Asperi Avatar answered Nov 27 '25 18:11

Asperi


Let see the documentation about .receive(on:)

Specifies the scheduler on which to receive elements from the publisher. Declaration

func receive<S>(on scheduler: S, options: S.SchedulerOptions? = nil) -> Publishers.ReceiveOn<Publishers.SubscribeOn<Deferred<Future<[Model], FileError>>, DispatchQueue>, S> where S : Scheduler

Discussion

You use the receive(on:options:) operator to receive results on a specific scheduler, such as performing UI work on the main run loop. In contrast with subscribe(on:options:), which affects upstream messages, receive(on:options:) changes the execution context of downstream messages. In the following example, requests to jsonPublisher are performed on backgroundQueue, but elements received from it are performed on RunLoop.main.

let jsonPublisher = MyJSONLoaderPublisher() // Some publisher.
let labelUpdater = MyLabelUpdateSubscriber() // Some subscriber that updates the UI.

jsonPublisher
    .subscribe(on: backgroundQueue)
    .receiveOn(on: RunLoop.main)
    .subscribe(labelUpdater)

Parameters

scheduler
The scheduler the publisher is to use for element delivery. options Scheduler options that customize the element delivery. Returns

A publisher that delivers elements using the specified scheduler.

in your case it means

import SwiftUI
import Combine

enum FileError: Error {
    case someError
}

class ViewModel: ObservableObject {
    @Published var modelArray = [Model]()
    private var subscriptions = Set<AnyCancellable>()
    func readData() {
        DataSource()
            .readFromBundle(resource: "Sample", type: "json")
            .sink(receiveCompletion: { completion in
                print("Completion: \(completion)")
            }) { array in
                print("received value")
                self.modelArray = array
        }.store(in: &subscriptions)
    }
}
struct ContentView: View {
    @ObservedObject var viewModel: ViewModel

    var body: some View {
        VStack {
            List(self.viewModel.modelArray) { model in
                Text("\(model.name)")
            }
        }
        .onAppear {
            self.viewModel.readData()
        }
    }
}

struct Model: Codable, Identifiable {
    var id: Int
    var name: String
}

class DataSource {
    private let readQueue = DispatchQueue(label: "ReadQueue", qos: .default, attributes: .concurrent)

    func readFromBundle (resource: String, type:String) -> AnyPublisher<[Model], FileError> {
            Deferred {
                 Future { promise in
                    guard let url = Bundle.main.url(forResource: "Sample", withExtension: "json"),
                      let data = try? Data(contentsOf: url),
                      let modelArray = try? JSONDecoder().decode([Model].self, from: data)
                      else {
                        promise(.failure(.someError))
                        return
                    }
                      promise(.success(modelArray))
                }
            }
            .subscribe(on: readQueue)
            .receive(on: RunLoop.main)
           .eraseToAnyPublisher()
        }
}

which explain, why Asperi's solution works. The difference is, that there is not necessary to call .receive(on:) again in readData()

the difference between DispatchQueue.main and RunLoop.main are not significant in your example.

like image 26
user3441734 Avatar answered Nov 27 '25 19:11

user3441734



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!