Have you any ideas that can help me simplify this stream?
The main challenge here is to check in a reactive way if consumers exist on a queue and then if there are no consumers check if there is at least one message on any of the queues.
Mono<String> resumeFullSync(FullSyncContext fullSyncContext,
Function<FullSyncContext, Mono<Void>> finalizeFullSyncCallback) {
var fullSyncSpec = new FullSyncSpecification(fullSyncProperties, fullSyncContext);
return sender.declare(fullSyncSpec.getQueueSpecification())
.flatMap(declareOk -> {
if (declareOk.getConsumerCount() == 0) {
log.debug("Queue has no consumers ({})", fullSyncContext.getCategoryName());
return Flux.concat(Mono.just(declareOk),
sender.declare(fullSyncSpec.getRetry1QueueSpecification()),
sender.declare(fullSyncSpec.getRetry2QueueSpecification()),
sender.declare(fullSyncSpec.getRetry3QueueSpecification()))
.any(d -> d.getMessageCount() > 0)
.flatMap(messagesExist -> {
if (messagesExist) {
log.debug("Queues have some messages ({})", fullSyncContext.getCategoryName());
return sender.declare(fullSyncSpec.getExchangeSpecification())
.then(sender.bind(fullSyncSpec.getBindingSpecification()))
.then(sender.bind(fullSyncSpec.getRetry1BindingSpecification()))
.then(sender.bind(fullSyncSpec.getRetry2BindingSpecification()))
.then(sender.bind(fullSyncSpec.getRetry3BindingSpecification()))
.doOnNext(b -> setupFullSyncConsumer(fullSyncSpec, finalizeFullSyncCallback));
} else {
log.debug("Queues have no messages ({}). Stopping", fullSyncContext.getCategoryName());
return Mono.empty();
}
});
} else {
log.debug("Queue has consumers ({}). Stopping", fullSyncContext.getCategoryName());
return Mono.empty();
}
}
)
.thenReturn(fullSyncContext.getCategoryName());
}
question from:
https://stackoverflow.com/questions/65892097/simplify-spring-reactor-stream-checking-if-at-least-one-condition-is-true-and 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…