I have a custom pipeline where I want to have 3 retry attempt for some error codes which are recoverable plus I want to add some short delay for the recoverable error. Anyone has an idea how I can do it?
func createRequest(for message: Message) -> AnyPublisher<ResponseMessage, Error> {
Future<ResponseMessage, Error> { promise in
.....
}
.tryCatch({ error -> AnyPublisher<ResponseMessage, Error> in
// If error is a recoverable error retry, otherwise fail directly
if case let MessageBusError.messageError(responseError) = error {
if responseError.isRecoverable {
// Make a next attempt only for recoverable error
throw error
}
}
//Should fail directly if the error code is not recoverable
return Fail<ResponseMessage, Error>(error: error)
.eraseToAnyPublisher()
})
.retry(3)
.eraseToAnyPublisher()
}
Basically, you need a retryIf
operator, so you can provide a closure to tell Combine which errors should be retried, and which not. I'm not aware of such an operator, but it's not hard to build one for yourself.
The idiomatic way is to extend the Publishers
namespace with a new type for your operator, and then extend Publisher
to add support for that operator so that yo can chain it along with other operators.
The implementation could look like this:
extension Publishers {
struct RetryIf<P: Publisher>: Publisher {
typealias Output = P.Output
typealias Failure = P.Failure
let publisher: P
let times: Int
let condition: (P.Failure) -> Bool
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
guard times > 0 else { return publisher.receive(subscriber: subscriber) }
publisher.catch { (error: P.Failure) -> AnyPublisher<Output, Failure> in
if condition(error) {
return RetryIf(publisher: publisher, times: times - 1, condition: condition).eraseToAnyPublisher()
} else {
return Fail(error: error).eraseToAnyPublisher()
}
}.receive(subscriber: subscriber)
}
}
}
extension Publisher {
func retry(times: Int, if condition: @escaping (Failure) -> Bool) -> Publishers.RetryIf<Self> {
Publishers.RetryIf(publisher: self, times: times, condition: condition)
}
}
Usage:
func createRequest(for message: Message) -> AnyPublisher<ResponseMessage, Error> {
Deferred {
Future<ResponseMessage, Error> { promise in
// future code
}
}
.retry(times: 3) { error in
if case let MessageBusError.messageError(responseError) = error, responseError.isRecoverable {
return true
}
return false
}
.eraseToAnyPublisher()
}
Note that I wrapped your Future
within a Deferred
one, otherwise the retry
operator would be meaningless, as the closure will not be executed multiple times. More details about that behaviour here: Swift. Combine. Is there any way to call a publisher block more than once when retry?.
Alternatively, you can write the Publisher
extension like this:
extension Publisher {
func retry(_ times: Int, if condition: @escaping (Failure) -> Bool) -> Publishers.RetryIf<Self> {
Publishers.RetryIf(publisher: self, times: times, condition: condition)
}
func retry(_ times: Int, unless condition: @escaping (Failure) -> Bool) -> Publishers.RetryIf<Self> {
retry(times, if: { !condition($0) })
}
}
, which enables some funky stuff, like this:
extension Error {
var isRecoverable: Bool { ... }
var isUnrecoverable: Bool { ... }
}
// retry at most 3 times while receiving recoverable errors
// bail out the first time when encountering an error that is
// not recoverable
somePublisher
.retry(3, if: \.isRecoverable)
// retry at most 3 times, bail out the first time when
// an unrecoverable the error is encountered
somePublisher
.retry(3, unless: \.isUnrecoverable)
Or even funkier, ruby-style:
extension Int {
var times: Int { self }
}
somePublisher
.retry(3.times, unless: \.isUnrecoverable)
Typically, I try to avoid building new publishers, and instead prefer to compose publishers from built-in operators. I found it to be rather tricky to do here. Maybe someone can suggest a better approach.
Retry
resubscribes on any failure, so to trick it, I packaged any non-recoverable errors into a Result
value containing the error, but leaving recoverable errors as failures to .retry
; then eventually unpack the Result
back into the corresponding value/error.
Here's how it would work in your case:
func createRequest(for message: Message)-> AnyPublisher<ResponseMessage, Error> {
Future<ResponseMessage, Error> { promise in
.....
}
// pack a value into Result
.map { v -> Result<ResponseMessage, Error> in .success(v) }
.tryCatch { error -> AnyPublisher<Result<ResponseMessage, Error>, Error> in
if case let MessageBusError.messageError(responseError) = error {
if responseError.isRecoverable {
// Keep recoverable errors as failures
throw error
}
}
// pack a non-recoverable error into Result with a failure
return Just(.failure(error)).setFailureType(Error.self)
.eraseToAnyPublisher()
}
.retry(3)
// unpack back
.flatMap { result in result.publisher }
.eraseToAnyPublisher()
}
For completeness, to extend Publisher
with the above approach:
extension Publisher {
private func retryOnly<U: Publisher>(
upstream: U,
retries: Int,
when predicate: @escaping (U.Failure) -> Bool
) -> AnyPublisher<U.Output, U.Failure> {
upstream
.map { v -> Result<U.Output, U.Failure> in .success(v) }
.catch { err -> AnyPublisher<Result<U.Output, U.Failure>, U.Failure> in
if predicate(err) {
return Fail(error: err).eraseToAnyPublisher()
} else {
return Just(.failure(err))
.setFailureType(to: U.Failure.self)
.eraseToAnyPublisher()
}
}
.retry(retries)
.flatMap { result in result.publisher }
.eraseToAnyPublisher()
}
func retry(_ retries: Int, when predicate: @escaping (Failure) -> Bool)
-> AnyPublisher<Output, Failure> {
return retryOnly(upstream: self, retries: retries, when: predicate)
}
}
failingPublisher.retry(3, when: { $0 is RecoverableError })
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With