You have stumbled very beautifully right into the use case for the .buffer
operator. Its purpose is to compensate for .flatMap
backpressure by accumulating values that would otherwise be dropped.
I will illustrate by a completely artificial example:
class ViewController: UIViewController {
let sub = PassthroughSubject<Int,Never>()
var storage = Set<AnyCancellable>()
var timer : Timer!
override func viewDidLoad() {
super.viewDidLoad()
sub
.flatMap(maxPublishers:.max(3)) { i in
return Just(i)
.delay(for: 3, scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
.sink { print($0) }
.store(in: &storage)
var count = 0
self.timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) {
_ in
count += 1
self.sub.send(count)
}
}
}
So, our publisher is emitting an incremented integer every second, but our flatMap
has .max(3)
and takes 3 seconds to republish a value. The result is that we start to miss values:
1
2
3
5
6
7
9
10
11
...
The solution is to put a buffer in front of the flatMap
. It needs to be large enough to hold any missed values long enough for them to be requested:
sub
.buffer(size: 20, prefetch: .keepFull, whenFull: .dropOldest)
.flatMap(maxPublishers:.max(3)) { i in
The result is that all the numeric values do in fact arrive at the sink
. Of course in real life we could still lose values if the buffer is not large enough to compensate for disparity between the rate of value emission from the publisher and the rate of value emission from the backpressuring flatMap
.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…