I'm working on a Spring Boot 2.4 webapp with WebFlux, Spring data R2DBC and PostgreSQL. Actually, I don't succeed to process each item in a Flux
in a dedicated transaction. The use case seems to me quite usual:
- Retrieve items from a remote API,
- Configure the returned
Flux
instance so as each item is processed in a dedicated transaction.
- If an item's transaction fails, it must not rollback other items' transaction. This is also true for commits.
Actually, I use the code below, but things are not working as I expect, probably because I misunderstand how reactive contexts are used:
public ParallelFlux<Item> processAllItems() {
LOGGER.debug("Starting item processing");
return client.findItems()
.parallel()
.runOn(Schedulers.boundedElastic())
.doOnNext(item -> LOGGER.debug("Item: {}", item.getId()))
.flatMap(this::processItem);
}
@Transactional
public Mono<Item> processItem(Item item) {
// Process the item asynchronously, potentially updating a PostgreSQL
}
When the flux emits 2 items, actually, I can see in the logs the R2dbcTransactionManager
creates a transaction for each item, in a distinct thread. But after that, any action on one of the transaction (commit/rollback) is performed on the other transaction also, both in the same thread. For example, in the logs below, one item processing failed and it rollbacks both transactions:
DEBUG [ scheduling-1] b.w.d.u.c.m.ItemProcessor : Starting item processing
DEBUG [ boundedElastic-3] b.w.d.u.c.m.ItemProcessor : Item: 35ZRNT9RVOQK
DEBUG [ boundedElastic-2] b.w.d.u.c.m.ItemProcessor : Item: 3XDSWAMB38KB
DEBUG [ boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Creating new transaction with name [b.w.d.u.c.m.ItemProcessor.processItem]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG [ boundedElastic-2] o.s.r.c.R2dbcTransactionManager : Creating new transaction with name [b.w.d.u.c.m.ItemProcessor.processItem]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG [ boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Acquired Connection [MonoRetry] for R2DBC transaction
DEBUG [ boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Switching R2DBC Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@70a7492a, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@7d2961f9}]] to manual commit
DEBUG [ boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Acquired Connection [MonoRetry] for R2DBC transaction
DEBUG [ boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Switching R2DBC Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@264df4f5, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@5074cdea}]] to manual commit
ERROR [ reactor-tcp-nio-1] b.w.d.u.c.m.ItemProcessor : Unknown item: 3XDSWAMB38KB
DEBUG [ reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Initiating transaction rollback
DEBUG [ reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Rolling back R2DBC transaction on Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@264df4f5, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@5074cdea}]]
DEBUG [ reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Releasing R2DBC Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@264df4f5, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@5074cdea}]] after transaction
DEBUG [ reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Initiating transaction rollback
DEBUG [ reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Rolling back R2DBC transaction on Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@70a7492a, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@7d2961f9}]]
I highly suspect processing items on separate threads is not enough to start a transactional context for each one.
What is the proper way to have each item of a flux processed in its own transaction, and let some item transactions committed while other are rolled back?
Thanks in advance for your help!
Regards
EDIT
I think I found out why this implementation is not working.
Actually when an item processing fails, the transaction is rolled back as expected, and the error signal is propagated to the origin flux, which fails in turn. This is not the desired behaviour of course, and the error signal must be catched to ensure other items in the flux are processed independently. The flux must complete successfully whatever happens for each item.
The code below allows to do that:
public ParallelFlux<Item> processAllItems() {
LOGGER.debug("Starting item processing");
return client.findItems()
.parallel()
.runOn(Schedulers.boundedElastic())
.doOnNext(item -> LOGGER.debug("Item: {}", item.getId()))
.flatMap(item -> processItem(item)
.onErrorResume(t -> Mono.<Item>empty().doOnSuccess(data -> LOGGER.error("Item skipped after error", t)
);
}
The only thing that still I am not understanding clearly is how the transactional context for each item is handled in each publisher and its reactive context: when errors are not catched with onErrorResume
, the error signal rolls back other transactions. In Spring Data R2DBC, I didn't find implementation notes, especially about the ReactiveTransactionManager
and R2dbcTransactionManager
implementations.
Feel free to provide details about this point particularly!
Thanks in advance!
Regards