Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flux.cache causing flow to hang indefinitely #2711

Closed
amrynsky opened this issue May 26, 2021 · 5 comments · Fixed by #2741
Closed

Flux.cache causing flow to hang indefinitely #2711

amrynsky opened this issue May 26, 2021 · 5 comments · Fixed by #2741
Assignees
Labels
status/has-workaround This has a known workaround described status/need-investigation This needs more in-depth investigation type/bug A general bug
Milestone

Comments

@amrynsky
Copy link

The idea is to implement continuous data processing based on the source (i.e. continuously polling portion data for every account). Cache is used to repeat processing flow for the same source but periodically reload the source. In addition, we need to limit concurrency for the processing.

After several iterations flow hangs indefinitely. Trying to troubleshoot more but it looks like a combination of cache & flatMap with concurrency causing this behavior.

Expected Behavior

Flow is repeated indefinitely

Actual Behavior

Flow hangs indefinitely

Steps to Reproduce

Here is a simplified flow that could be used to reproduce the issue.

@Test
void test() {
	var flow = getSource()
			.doOnSubscribe(__ -> log.info("Loading source..."))
			.cache(Duration.ofSeconds(1))
			.log()
			.doOnSubscribe(__ -> log.info("Pooling cycle starting..."))
			.flatMap(this::process, 2)
			.repeat(1000);

	StepVerifier.create(flow)
			.expectNextCount(1000 * 5)
			.verifyComplete();
}

private Flux<Integer> getSource() {
	return Flux.just(1, 2, 3, 4, 5);
}

private Mono<Integer> process(int channel) {
	return Mono.just(channel)
			.doOnNext(rec -> log.info("Processing: {}", rec))
			.delayElement(Duration.ofMillis(50));
}

Your Environment

  • Reactor version(s) used: 3.4.5, 3.4.6
  • JVM version (java -version): 14
  • OS and version (eg uname -a): Darwin M-C02YP071JHD4 20.4.0 Darwin Kernel Version 20.4.0: Thu Apr 22 21:46:47 PDT 2021; root:xnu-7195.101.2~1/RELEASE_X86_64 x86_64
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label May 26, 2021
@simonbasle simonbasle added status/need-investigation This needs more in-depth investigation type/bug A general bug and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels May 31, 2021
@simonbasle simonbasle added this to the 3.4.7 milestone May 31, 2021
@simonbasle
Copy link
Member

simonbasle commented May 31, 2021

I was able to reproduce it indeed. Due to the timing, there is some arbitrary nature to the reproducer, but I found that I could simplify the example and get a deterministic reproducer by using a VirtualTimeScheduler for the cache part (since it only uses the clock aspect of the scheduler). The delayElement in process becomes redundant then.

Here is my version of the repro case:

	@Test
	void test() throws InterruptedException {
		//vtsStop(); //only important because I added the test in FluxReplayTest.java
		VirtualTimeScheduler vts = VirtualTimeScheduler.create();
		AtomicInteger sourceLoad = new AtomicInteger();
		AtomicInteger pollStart = new AtomicInteger();
		final AtomicInteger pollEnd = new AtomicInteger();

		Flux<String> flow = getSource()
				.doOnSubscribe(__ -> log.info("Loading source #" + sourceLoad.incrementAndGet()))
				.cache(Duration.ofSeconds(1), vts)
				.flatMap(v -> {
					if (v == 1) {
						pollStart.incrementAndGet();
					}
					else if (v == 5) { //this assume a 5 element source
						pollEnd.incrementAndGet();
					}
					return process(pollStart.get() + "_" + v, vts);
				}, 2)
				.repeat(3);

		StepVerifier.create(flow)
				.expectNextCount(4 * 5)
				.expectComplete()
				.verify(Duration.ofSeconds(300));
	}

	private Flux<Integer> getSource() {
		return Flux.just(1, 2, 3, 4, 5)
				.doOnRequest(r -> log.info("source.request({})", r == Long.MAX_VALUE ? "unbounded" : r))
				.hide();
	}

	private Mono<String> process(String channel, VirtualTimeScheduler timeScheduler) {
		if (channel.equals("2_4")) {
			timeScheduler.advanceTimeBy(Duration.ofMillis(1001));
		}
		return Mono.fromCallable(() -> {
			log.info("Processing: {}", channel);
			return channel;
		});
	}

@amrynsky
Copy link
Author

amrynsky commented Jun 1, 2021

As a temporary workaround Mono.cache could be used instead

var flux = getSource()
    .doOnSubscribe(__ -> log.info("Loading source..."))
    .collectList()
    .cache(Duration.ofSeconds(2))
    .flatMapIterable(Function.identity())
    .doOnSubscribe(__ -> log.info("Pooling cycle starting..."))
    .flatMap(this::process, 2)
    .repeat();

@simonbasle simonbasle added the status/has-workaround This has a known workaround described label Jun 21, 2021
@simonbasle simonbasle modified the milestones: 3.4.7, 3.4.8 Jun 21, 2021
@simonbasle
Copy link
Member

we unfortunately still haven't got to the bottom of this, moving it back in the 3.4.x backlog

@simonbasle simonbasle modified the milestones: 3.4.8, 3.4.x Backlog Jul 12, 2021
@simonbasle simonbasle linked a pull request Aug 9, 2021 that will close this issue
@simonbasle simonbasle modified the milestones: 3.4.x Backlog, 3.4.10 Sep 9, 2021
simonbasle added a commit that referenced this issue Sep 9, 2021
This commit reworks the FluxReplay implementation to use a prefetch
strategy, as well as state-machine-like state to better keep track of
demand, avoiding hanging.

The operator is driven by active subscribers so the next prefetch
happens only when all the subscribers have consumed a particular
element. To enable that, now every implementation has an index actively
utilized to identify if specific element is a trigger for the next
prefetch.

In case there are no subscribers, the prefetch strategy is self-driven
and requests more when there are produced N elements into the cache

The downside is that historySize 0 is not supported anymore. It is
believed that it wasn't really relevant, but on the slight possibility
that this is actually used out there, the FluxReplay is replaced by a
FluxPublish transparently when 0 is passed to the operator method.

On the `Sinks` side, since part of the implementation is shared, the
same limitation is present, but we are more proactive in rejecting a
zero limit / historySize there. The bet is that for this younger API,
such incoherent usage hasn't emerged yet.

Fixes #2711.

Co-authored-by: Simon Baslé <sbasle@vmware.com>
@ginkel
Copy link

ginkel commented Dec 27, 2021

Hi there,

apparently I do have an application that breaks when upgrading reactor-core from 3.4.9 to 3.4.10. A git bisect brings up commit a596764 as probable cause. I'm far from a reproducer due to the complexity of the involved flows and would appreciate some guidance how to proceed. If you'd rather like me to open a new issue, please let me know.

Thanks,
Thilo

@OlegDokuka
Copy link
Contributor

@ginkel, feel free to open an issue if you know how to explain your problem

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/has-workaround This has a known workaround described status/need-investigation This needs more in-depth investigation type/bug A general bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants