Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConnectableFlux changes behavior between 3.2 and 3.3 #2028

Closed
davinkevin opened this issue Feb 3, 2020 · 8 comments · Fixed by #2031
Closed

ConnectableFlux changes behavior between 3.2 and 3.3 #2028

davinkevin opened this issue Feb 3, 2020 · 8 comments · Fixed by #2031
Milestone

Comments

@davinkevin
Copy link
Contributor

davinkevin commented Feb 3, 2020

During an upgrade, I discover a very behavior change on the object ConnectableFlux. I've checked all changelogs between 3.2.x and 3.3.x, but without any success from my point of view.

I've set up a project with CI ready. You can compare the result of each builds on the pipeline view here

The main problem seems to be linked to the replay(1) operator I use to have a flux with always at least a value. At the construction of the bean, I trigger (from the constructor) the ConnectableFlux#connect method.

All the tests were ok, but with the migration to Reactor 3.3.2 (due to spring boot 2.2.4), I found a problem with this code.

The flux with the replay(1) operator seems to only emit the first value and nothing more.

The example I use in the repository is made with a TokenProvider with the following code (I've modified it to make it more testable, not real production code 😅):

class TokenProvider(
        key: String,
        clientId: String,
        private val webClient: WebClient,
        connectOnConstruction: Boolean = false
) {

    val tokenFlux = fetchTokenWithApiKey(AuthInformation(api_key = key, grant_type = "api_key", client_id = clientId))
            .replay(1)

    init {
        if (connectOnConstruction) {
            tokenFlux.connect()
        }
    }

    fun fetchTokenWithApiKey(info: AuthInformation): Flux<AuthJwtToken> {
        return fetchToken(info)
                .expand { fetchTokenAfterMidLife(info, it) }
                .retryWhen { it.flatMap { Mono.delay(ofSeconds(1)) } }
                .log("fetchTokenWithApiKey")
    }

    private fun fetchTokenAfterMidLife(authInformation: AuthInformation, token: AuthJwtToken): Mono<AuthJwtToken> {
        return Mono.delay(between(Instant.now(), token.midLife))
                .flatMap { fetchToken(authInformation) }
    }

    private fun fetchToken(authInformation: AuthInformation): Mono<AuthJwtToken> {
        return webClient
                .post()
                .uri("/auth/with/api/key")
                .contentType(MediaType.APPLICATION_FORM_URLENCODED)
                .accept(MediaType.APPLICATION_JSON)
                .body(BodyInserters.fromFormData(authInformation.toFormData()))
                .retrieve()
                .bodyToMono<AuthJwtToken>()
}

I've removed here all log() operator and logs from the code. You can find the real version here

I've included 4 unit tests on this code, which all fail on reactor-core 3.3.2.

class TokenProviderTest {

    private val client = WebClient.builder().baseUrl("http://localhost:9876/").build()

    @Test
    @Timeout(5)
    fun `should renew token after midlife by the construction of the token provider with connecting on init`() {
        /* Given */
        /* When */
        StepVerifier.withVirtualTime {
            TokenProvider("foo", "clientId", client, true)
                    .tokenFlux
                    .take(3)
        }
                /* Then */
                .expectSubscription()
                .assertNext { assertThat(it.access_token).isEqualTo("1st") }
                .thenAwait(ofSeconds(5))
                .assertNext { assertThat(it.access_token).isEqualTo("2nd") }
                .thenAwait(ofSeconds(15))
                .assertNext { assertThat(it.access_token).isEqualTo("3rd") }
                .verifyComplete()
    }

    @Test
    @Timeout(5)
    fun `should renew token after midlife with direct call`() {
        /* Given */
        val tokenProvider = TokenProvider("foo", "clientId", client, false)

        /* When */
        StepVerifier.withVirtualTime {
            tokenProvider
                    .fetchTokenWithApiKey(AuthInformation(api_key = "key", grant_type = "api_key", client_id = "clientId"))
                    .take(3)
        }
                /* Then */
                .expectSubscription()
                .assertNext { assertThat(it.access_token).isEqualTo("1st") }
                .thenAwait(ofSeconds(5))
                .assertNext { assertThat(it.access_token).isEqualTo("2nd") }
                .thenAwait(ofSeconds(15))
                .assertNext { assertThat(it.access_token).isEqualTo("3rd") }
                .verifyComplete()
    }

    @Test
    @Timeout(5)
    fun `should work with a replay on sub flux`() {
        /* Given */
        val tokenProvider = TokenProvider("foo", "clientId", client)

        val tokenFlux = tokenProvider
                .fetchTokenWithApiKey(AuthInformation(api_key = "key", grant_type = "api_key", client_id = "clientId"))
                .log("tokenFlux")
                .replay(1)

        tokenFlux.connect()

        /* When */
        StepVerifier.withVirtualTime { tokenFlux.take(3) }
                /* Then */
                .expectSubscription()
                .assertNext { assertThat(it.access_token).isEqualTo("1st") }
                .thenAwait(ofSeconds(5))
                .assertNext { assertThat(it.access_token).isEqualTo("2nd") }
                .thenAwait(ofSeconds(15))
                .assertNext { assertThat(it.access_token).isEqualTo("3rd") }
                .verifyComplete()
    }

    @Test
    fun `should work on replay`() {
        /* Given */
        val flux = Flux.just(1, 2, 3)
                .log()
                .replay(1)
        /* When */
        flux.connect()
        /* Then */
        StepVerifier.create(Mono.from(flux))
                .expectSubscription()
                .expectNext(3)
                .verifyComplete()
    }
}

Like before, I've removed some code for brevity, you can find real code here

Expected Behavior

I was expecting the code and test to behave like in the 3.2.x : see tests results

The replay should keep the last value when used as replay(1) and should not prevent new value to be published in the flux.

Actual Behavior

The current behavior could be seen here in the CI in 3.3.2: see tests results

The replay doesn't work like before and prevent new value to be published.

Steps to Reproduce

All the code is available here

Possible Solution

Don't know... a bug in the ConnectableFlux or wrong usage of the API, but I don't think so 😅.

Your Environment

  • Reactor version(s) used: 3.2.12 OK, 3.3.2 KO
  • Other relevant libraries versions (eg. netty, ...): All is available in the pom xml in the project
  • JVM version (javar -version): Java 11.0.5
  • OS and version (eg uname -a): MacOS, Linux

/cc @TheoCadoret

@simonbasle
Copy link
Member

Can you try with 3.3.3.BUILD-SNAPSHOT? There was a regression around replay(n) in 3.3.0-3.3.2.

@davinkevin
Copy link
Contributor Author

davinkevin commented Feb 3, 2020

I've tried but I don't know if I follow the right path to add to my spring-boot application the 3.3.3.BUILD-SNAPSHOT version.

I've created a branch and set up a CI on it with this, the bug still occurs 😅.

https://gitlab.com/davinkevin/issue-after-upgrade-to-3-3-2/-/jobs/424288390

To try the SNAPSHOT version, I had to the pom.xml:

...
                 <dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-core</artifactId>
			<version>3.3.3.BUILD-SNAPSHOT</version>
		</dependency>
...
	<repositories>
		<repository>
			<id>spring-milestones</id>
			<name>Spring Milestones</name>
			<url>https://repo.spring.io/milestone</url>
		</repository>
		<repository>
			<id>spring-snapshots</id>
			<name>Spring Snapshots</name>
			<url>https://repo.spring.io/snapshot</url>
			<snapshots>
				<enabled>true</enabled>
			</snapshots>
		</repository>
	</repositories>
	<pluginRepositories>
		<pluginRepository>
			<id>spring-milestones</id>
			<name>Spring Milestones</name>
			<url>https://repo.spring.io/milestone</url>
		</pluginRepository>
		<pluginRepository>
			<id>spring-snapshots</id>
			<name>Spring Snapshots</name>
			<url>https://repo.spring.io/snapshot</url>
			<snapshots>
				<enabled>true</enabled>
			</snapshots>
		</pluginRepository>
	</pluginRepositories>

The diff is available here: https://gitlab.com/davinkevin/issue-after-upgrade-to-3-3-2/-/compare/master...reactor-3-3-3-BUILD-SNAPSHOT

@simonbasle
Copy link
Member

simonbasle commented Feb 4, 2020

Yes that seems correct. In any case, the previous fix indeed didn't help your case. I have found a fix for your particular issue. Also since there are multiple issues around that initial regression, I opened a tracking issue: #2030

@simonbasle simonbasle added this to the 3.3.3.RELEASE milestone Feb 4, 2020
simonbasle added a commit that referenced this issue Feb 4, 2020
This commit ensures that Flux.replay falls back to the 3.2.x behavior
of requesting Long.MAX_VALUE when no early subscriber emitted requests
before the `connect()`.

Also fixes #2028
@TheoCadoret
Copy link

For information, as a bypass we switched from replay(1) to cache(1) and it seems to be working just fine.

@davinkevin
Copy link
Contributor Author

For information, as a bypass we switched from replay(1) to cache(1) and it seems to be working just fine.

And we use a subscribe() in the constructor in place of connect().

simonbasle added a commit that referenced this issue Feb 5, 2020
This commit ensures that Flux.replay falls back to the 3.2.x behavior
of requesting Long.MAX_VALUE when no early subscriber emitted requests
before the `connect()`.

Also fixes #2028
@simonbasle
Copy link
Member

@TheoCadoret @davinkevin the fix has been merged and is available in 3.3.3.BUILD-SNAPSHOT, can you validate against your actual code?

@TheoCadoret
Copy link

@simonbasle indeed tests run really smoothly with this one. Thanks for reactivity !

@davinkevin
Copy link
Contributor Author

davinkevin commented Feb 5, 2020

CI of demo project ends well too 🤩

https://gitlab.com/davinkevin/issue-after-upgrade-to-3-3-2/-/jobs/426700006

Thanks, I hope this release will be include in spring boot soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants