Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Control after cancel signal or error #747

Closed
akulik-belka opened this issue Mar 23, 2022 · 2 comments
Closed

Control after cancel signal or error #747

akulik-belka opened this issue Mar 23, 2022 · 2 comments
Labels
for: stackoverflow A question that's better suited to stackoverflow.com

Comments

@akulik-belka
Copy link

The team and I discovered a small feature of the TransactionalOperator.
If there is an active process of working with the database and at that moment send a cancel signal or an error occurs in the parallel stream, the stream control will return only after the operation has completed and a rollback occurs on it.
In the event that Cancel was passed from outside (outside the context of the transactional operator), then control will return instantly.

class RollbackControlTest @Autowired constructor(
    private val database: R2dbcEntityTemplate,
    private val transactionManager: ReactiveTransactionManager
) {

    @Test
    fun `return control if stream in transaction`() {
        val timer = AtomicReference<Instant>()
        Mono
            .zip(
                dbCallTo2Seconds(),
                anotherIoCall500Millis().then<Int>(Mono.error(RuntimeException("test")))
            ) { a, b -> a + b }
            .doOnSubscribe { timer.set(Instant.now()) }
            .`as`(TransactionalOperator.create(transactionManager)::transactional)
            .onErrorResume { Mono.just(1) }
            .doOnSuccess { logger.info("Process take {} millis.", Duration.between(timer.get(), Instant.now()).toMillis()) } //Control will be returned after 2 seconds, transaction rollbacked
            .then()
            .test()
            .verifyComplete()
    }

    @Test
    fun `return control if db stream cancelled`() {
        val timer = AtomicReference<Instant>()
        Mono
            .zip(
                dbCallTo2Seconds().timeout(Duration.ofMillis(100)),
                anotherIoCall500Millis().then<Int>(Mono.error(RuntimeException("test")))
            ) { a, b -> a + b }
            .doOnSubscribe { timer.set(Instant.now()) }
            .`as`(TransactionalOperator.create(transactionManager)::transactional)
            .onErrorResume { Mono.just(1) }
            .doOnSuccess { logger.info("Process take {} millis.", Duration.between(timer.get(), Instant.now()).toMillis()) } //Control will be returned after 2 seconds, transaction rollbacked
            .then()
            .test()
            .verifyComplete()
    }

    @Test
    fun `without transaction operator`() {
        val timer = AtomicReference<Instant>()
        Mono
            .zip(
                dbCallTo2Seconds().timeout(Duration.ofMillis(100)),
                anotherIoCall500Millis().then<Int>(Mono.error(RuntimeException("test")))
            ) { a, b -> a + b }
            .doOnSubscribe { timer.set(Instant.now()) }
            .onErrorResume { Mono.just(1) }
            .doOnSuccess { logger.info("Process take {} millis.", Duration.between(timer.get(), Instant.now()).toMillis()) } //Controll will be returned after 100 millis immediately after timeout
            .then()
            .test()
            .verifyComplete()
    }

    @Test
    fun `return control after transaction`() {
        val timer = AtomicReference<Instant>()
        Mono
            .zip(
                dbCallTo2Seconds(),
                anotherIoCall500Millis().then<Int>(Mono.error(RuntimeException("test")))
            ) { a, b -> a + b }
            .`as`(TransactionalOperator.create(transactionManager)::transactional)
            .doOnSubscribe { timer.set(Instant.now()) }
            .timeout(Duration.ofMillis(100))
            .onErrorResume { Mono.just(1) }
            .doOnSuccess { logger.info("Process take {} millis.", Duration.between(timer.get(), Instant.now()).toMillis()) } //Controll will be returned after 100 millis immediately after timeout
            .then()
            .test()
            .verifyComplete()
    }

    private fun dbCallTo2Seconds(): Mono<Int> {
        return database.databaseClient.sql("SELECT SLEEP(2);")
            .then()
            .thenReturn(1)
    }

    fun anotherIoCall500Millis(): Mono<Int> {
        return Mono.delay(Duration.ofMillis(500))
            .thenReturn(1)
    }

    companion object {
        private val logger = LoggerFactory.getLogger(RollbackControlTest::class.java)
    }
}

  • In case of a cancel signal outside the transaction operator, will there be an honest rollback?
  • Do we understand correctly that this behavior is expected?
  • Can we expect safe behavior without leaks?
  • How safe is it to use timeouts for operations and how quickly will the connection return to the pool? Will there be problems in backpressure?
@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Mar 23, 2022
@mp911de
Copy link
Member

mp911de commented Mar 24, 2022

Thanks for reaching out. The transaction manager meanwhile lives in Spring Framework's Spring R2DBC module.

To address your questions:

  1. In case of a cancel signal outside the transaction operator, will there be an honest rollback?

Yes, a rollback is going to happen. It might seem counterintuitive, however we had commit-on-cancel behavior once (see spring-projects/spring-framework#25091 for the entire discussion) that caused partially committed transactions when e.g. a connection reset triggered cancel.

  1. Do we understand correctly that this behavior is expected?

It is expected to rollback on cancel. Cancel can happen due to take operators but also if a downstream subscription gets canceled because of an error. Since we cannot disambiguate the both, rollback seems the safer and more reliable option.

  1. Can we expect safe behavior without leaks?
  2. How safe is it to use timeouts for operations and how quickly will the connection return to the pool? Will there be problems in backpressure?

These two go somewhat together. Canceling is a non-blocking signal towards the upstream subscription. Once a subscription is canceled, we can no longer await any kind of completion. This means, that a rollback happens concurrently while the actual request processing might have terminated already. Once the cancelation is complete, connections are put back into the pool.

@mp911de mp911de added for: stackoverflow A question that's better suited to stackoverflow.com and removed status: waiting-for-triage An issue we've not yet triaged labels Mar 24, 2022
@akulik-belka
Copy link
Author

Thanks a lot for the replies. They will help us a lot in the further design of the system.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for: stackoverflow A question that's better suited to stackoverflow.com
Projects
None yet
Development

No branches or pull requests

3 participants