From 1bf943aea645c7e6bf5c19e71891b5eaab568bf0 Mon Sep 17 00:00:00 2001 From: Yaroslav Stavnichiy Date: Sat, 11 Jan 2020 21:36:35 +0300 Subject: [PATCH] Issue-1251: add explicit `defer` parameter to `VirtualTimeScheduler.advanceTime_` methods --- .../test/DefaultStepVerifierBuilder.java | 3 +- .../test/scheduler/VirtualTimeScheduler.java | 71 ++++++++++++------- .../java/reactor/test/StepVerifierTests.java | 4 +- .../scheduler/VirtualTimeSchedulerTests.java | 10 ++- 4 files changed, 55 insertions(+), 33 deletions(-) diff --git a/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java b/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java index 9600573624..09a81c7745 100644 --- a/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java +++ b/reactor-test/src/main/java/reactor/test/DefaultStepVerifierBuilder.java @@ -59,7 +59,6 @@ import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; -import reactor.core.publisher.ParallelFlux; import reactor.core.publisher.Signal; import reactor.test.scheduler.VirtualTimeScheduler; import reactor.util.Logger; @@ -2318,7 +2317,7 @@ static void virtualOrRealWait(Duration duration, DefaultVerifySubscriber s) s.completeLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS); } else { - s.virtualTimeScheduler.advanceTimeBy(duration); + s.virtualTimeScheduler.advanceTimeBy(duration, true); } } diff --git a/reactor-test/src/main/java/reactor/test/scheduler/VirtualTimeScheduler.java b/reactor-test/src/main/java/reactor/test/scheduler/VirtualTimeScheduler.java index d9de959354..9c980dced9 100644 --- a/reactor-test/src/main/java/reactor/test/scheduler/VirtualTimeScheduler.java +++ b/reactor-test/src/main/java/reactor/test/scheduler/VirtualTimeScheduler.java @@ -38,7 +38,7 @@ /** * A {@link Scheduler} that uses a virtual clock, allowing to manipulate time - * (eg. in tests). Can replace the default reactor schedulers by using + * (eg. in tests). Can replace the default reactor schedulers by using * the {@link #getOrSet} / {@link #set(VirtualTimeScheduler)} methods. * * @author Stephane Maldini @@ -215,19 +215,38 @@ public void advanceTime() { * @param delayTime the amount of time to move the {@link VirtualTimeScheduler}'s clock forward */ public void advanceTimeBy(Duration delayTime) { - advanceTime(delayTime.toNanos()); + advanceTimeBy(delayTime, false); + } + + /** + * Moves the {@link VirtualTimeScheduler}'s clock forward by a specified amount of time. + * + * @param delayTime the amount of time to move the {@link VirtualTimeScheduler}'s clock forward + * @param defer true to defer the clock move until there are tasks in queue + */ + public void advanceTimeBy(Duration delayTime, boolean defer) { + advanceTime(delayTime.toNanos(), defer); } /** * Moves the {@link VirtualTimeScheduler}'s clock to a particular moment in time. * - * @param instant the point in time to move the {@link VirtualTimeScheduler}'s - * clock to + * @param instant the point in time to move the {@link VirtualTimeScheduler}'s clock to */ public void advanceTimeTo(Instant instant) { + advanceTimeTo(instant, false); + } + + /** + * Moves the {@link VirtualTimeScheduler}'s clock to a particular moment in time. + * + * @param instant the point in time to move the {@link VirtualTimeScheduler}'s clock to + * @param defer true to defer the clock move until there are tasks in queue + */ + public void advanceTimeTo(Instant instant, boolean defer) { long targetTime = TimeUnit.NANOSECONDS.convert(instant.toEpochMilli(), TimeUnit.MILLISECONDS); - advanceTime(targetTime - nanoTime); + advanceTime(targetTime - nanoTime, defer); } /** @@ -308,9 +327,11 @@ public Disposable schedulePeriodically(Runnable task, return periodicTask; } - final void advanceTime(long timeShiftInNanoseconds) { + final void advanceTime(long timeShiftInNanoseconds, boolean defer) { Operators.addCap(DEFERRED_NANO_TIME, this, timeShiftInNanoseconds); - drain(); + if (!defer || !queue.isEmpty()) { + drain(); + } } final void drain() { @@ -319,27 +340,25 @@ final void drain() { return; } for(;;) { - if (!queue.isEmpty()) { - //resetting for the first time a delayed schedule is called after a deferredNanoTime is set - long targetNanoTime = nanoTime + DEFERRED_NANO_TIME.getAndSet(this, 0); - - while (!queue.isEmpty()) { - TimedRunnable current = queue.peek(); - if (current == null || current.time > targetNanoTime) { - break; - } - //for the benefit of tasks that call `now()` - // if scheduled time is 0 (immediate) use current virtual time - nanoTime = current.time == 0 ? nanoTime : current.time; - queue.remove(); - - // Only execute if not unsubscribed - if (!current.scheduler.shutdown) { - current.run.run(); - } + //resetting for the first time a delayed schedule is called after a deferredNanoTime is set + long targetNanoTime = nanoTime + DEFERRED_NANO_TIME.getAndSet(this, 0); + + while (!queue.isEmpty()) { + TimedRunnable current = queue.peek(); + if (current == null || current.time > targetNanoTime) { + break; + } + //for the benefit of tasks that call `now()` + // if scheduled time is 0 (immediate) use current virtual time + nanoTime = current.time == 0 ? nanoTime : current.time; + queue.remove(); + + // Only execute if not unsubscribed + if (!current.scheduler.shutdown) { + current.run.run(); } - nanoTime = targetNanoTime; } + nanoTime = targetNanoTime; remainingWork = ADVANCE_TIME_WIP.addAndGet(this, -remainingWork); if (remainingWork == 0) { diff --git a/reactor-test/src/test/java/reactor/test/StepVerifierTests.java b/reactor-test/src/test/java/reactor/test/StepVerifierTests.java index 293f1dc9a3..29f2d1b4ff 100644 --- a/reactor-test/src/test/java/reactor/test/StepVerifierTests.java +++ b/reactor-test/src/test/java/reactor/test/StepVerifierTests.java @@ -2131,7 +2131,7 @@ public void parallelVerifyWithVtsMutuallyExclusive() { } } - @Test + @Test(timeout = 5000) public void gh783() { int size = 1; Scheduler parallel = Schedulers.newParallel("gh-783"); @@ -2150,7 +2150,7 @@ public void gh783() { .verifyComplete(); } - @Test + @Test(timeout = 5000) public void gh783_deferredAdvanceTime() { int size = 61; Scheduler parallel = Schedulers.newParallel("gh-783"); diff --git a/reactor-test/src/test/java/reactor/test/scheduler/VirtualTimeSchedulerTests.java b/reactor-test/src/test/java/reactor/test/scheduler/VirtualTimeSchedulerTests.java index f8e6d21f4b..7731f275f9 100644 --- a/reactor-test/src/test/java/reactor/test/scheduler/VirtualTimeSchedulerTests.java +++ b/reactor-test/src/test/java/reactor/test/scheduler/VirtualTimeSchedulerTests.java @@ -132,7 +132,7 @@ public void captureNowInScheduledTask() { List periodicExecutionTimestamps = new ArrayList<>(); try { - vts.advanceTimeBy(Duration.ofMillis(100)); + vts.advanceTimeBy(Duration.ofMillis(100), true); vts.schedule(() -> singleExecutionsTimestamps.add(vts.now(TimeUnit.MILLISECONDS)), 100, TimeUnit.MILLISECONDS); @@ -192,6 +192,10 @@ public void racingAdvanceTimeOnEmptyQueue() { assertThat(vts.now(TimeUnit.MILLISECONDS)) .as("iteration " + i) .isEqualTo(13_000 * i); + + assertThat(vts.nanoTime) + .as("now() == nanoTime in iteration " + i) + .isEqualTo(vts.now(TimeUnit.NANOSECONDS)); } } finally { @@ -230,8 +234,8 @@ public void racingAdvanceTimeOnVaryingQueue() { try { for (int i = 1; i <= 100; i++) { RaceTestUtils.race( - () -> vts.advanceTimeBy(Duration.ofSeconds(10)), - () -> vts.advanceTimeBy(Duration.ofSeconds(3))); + () -> vts.advanceTimeBy(Duration.ofSeconds(10), true), + () -> vts.advanceTimeBy(Duration.ofSeconds(3), true)); if (i % 10 == 0) { vts.schedule(count::incrementAndGet, 14, TimeUnit.SECONDS);