Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
496 views
in Technique[技术] by (71.8m points)

java - Processing each flux item in a dedicated transaction with Spring WebFlux, Reactor, R2DBC

I'm working on a Spring Boot 2.4 webapp with WebFlux, Spring data R2DBC and PostgreSQL. Actually, I don't succeed to process each item in a Flux in a dedicated transaction. The use case seems to me quite usual:

  • Retrieve items from a remote API,
  • Configure the returned Flux instance so as each item is processed in a dedicated transaction.
  • If an item's transaction fails, it must not rollback other items' transaction. This is also true for commits.

Actually, I use the code below, but things are not working as I expect, probably because I misunderstand how reactive contexts are used:

public ParallelFlux<Item> processAllItems() {
    LOGGER.debug("Starting item processing");
    return client.findItems()
        .parallel()
        .runOn(Schedulers.boundedElastic())
        .doOnNext(item -> LOGGER.debug("Item: {}", item.getId()))
        .flatMap(this::processItem);
}

@Transactional
public Mono<Item> processItem(Item item) {
    // Process the item asynchronously, potentially updating a PostgreSQL
}

When the flux emits 2 items, actually, I can see in the logs the R2dbcTransactionManager creates a transaction for each item, in a distinct thread. But after that, any action on one of the transaction (commit/rollback) is performed on the other transaction also, both in the same thread. For example, in the logs below, one item processing failed and it rollbacks both transactions:

DEBUG [        scheduling-1] b.w.d.u.c.m.ItemProcessor       : Starting item processing
DEBUG [    boundedElastic-3] b.w.d.u.c.m.ItemProcessor       : Item: 35ZRNT9RVOQK
DEBUG [    boundedElastic-2] b.w.d.u.c.m.ItemProcessor       : Item: 3XDSWAMB38KB
DEBUG [    boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Creating new transaction with name [b.w.d.u.c.m.ItemProcessor.processItem]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG [    boundedElastic-2] o.s.r.c.R2dbcTransactionManager : Creating new transaction with name [b.w.d.u.c.m.ItemProcessor.processItem]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
DEBUG [    boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Acquired Connection [MonoRetry] for R2DBC transaction
DEBUG [    boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Switching R2DBC Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@70a7492a, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@7d2961f9}]] to manual commit
DEBUG [    boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Acquired Connection [MonoRetry] for R2DBC transaction
DEBUG [    boundedElastic-3] o.s.r.c.R2dbcTransactionManager : Switching R2DBC Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@264df4f5, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@5074cdea}]] to manual commit
ERROR [   reactor-tcp-nio-1] b.w.d.u.c.m.ItemProcessor       : Unknown item: 3XDSWAMB38KB
DEBUG [   reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Initiating transaction rollback
DEBUG [   reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Rolling back R2DBC transaction on Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@264df4f5, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@5074cdea}]]
DEBUG [   reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Releasing R2DBC Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@264df4f5, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@5074cdea}]] after transaction
DEBUG [   reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Initiating transaction rollback
DEBUG [   reactor-tcp-nio-1] o.s.r.c.R2dbcTransactionManager : Rolling back R2DBC transaction on Connection [PooledConnection[PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@70a7492a, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@7d2961f9}]]

I highly suspect processing items on separate threads is not enough to start a transactional context for each one.

What is the proper way to have each item of a flux processed in its own transaction, and let some item transactions committed while other are rolled back?

Thanks in advance for your help! Regards

EDIT

I think I found out why this implementation is not working.

Actually when an item processing fails, the transaction is rolled back as expected, and the error signal is propagated to the origin flux, which fails in turn. This is not the desired behaviour of course, and the error signal must be catched to ensure other items in the flux are processed independently. The flux must complete successfully whatever happens for each item.

The code below allows to do that:

public ParallelFlux<Item> processAllItems() {
    LOGGER.debug("Starting item processing");
    return client.findItems()
        .parallel()
        .runOn(Schedulers.boundedElastic())
        .doOnNext(item -> LOGGER.debug("Item: {}", item.getId()))
        .flatMap(item -> processItem(item)
            .onErrorResume(t -> Mono.<Item>empty().doOnSuccess(data -> LOGGER.error("Item skipped after error", t)
        );
}

The only thing that still I am not understanding clearly is how the transactional context for each item is handled in each publisher and its reactive context: when errors are not catched with onErrorResume, the error signal rolls back other transactions. In Spring Data R2DBC, I didn't find implementation notes, especially about the ReactiveTransactionManager and R2dbcTransactionManager implementations.

Feel free to provide details about this point particularly! Thanks in advance! Regards


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...