Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #1251 Add explicit defer parameter to VirtualTimeScheduler.advanceTime_ methods #2012

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -59,7 +59,6 @@
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.ParallelFlux;
import reactor.core.publisher.Signal;
import reactor.test.scheduler.VirtualTimeScheduler;
import reactor.util.Logger;
Expand Down Expand Up @@ -2318,7 +2317,7 @@ static void virtualOrRealWait(Duration duration, DefaultVerifySubscriber<?> s)
s.completeLatch.await(duration.toMillis(), TimeUnit.MILLISECONDS);
}
else {
s.virtualTimeScheduler.advanceTimeBy(duration);
s.virtualTimeScheduler.advanceTimeBy(duration, true);
}
}

Expand Down
Expand Up @@ -38,7 +38,7 @@

/**
* A {@link Scheduler} that uses a virtual clock, allowing to manipulate time
* (eg. in tests). Can replace the default reactor schedulers by using
* (eg. in tests). Can replace the default reactor schedulers by using
* the {@link #getOrSet} / {@link #set(VirtualTimeScheduler)} methods.
*
* @author Stephane Maldini
Expand Down Expand Up @@ -215,19 +215,38 @@ public void advanceTime() {
* @param delayTime the amount of time to move the {@link VirtualTimeScheduler}'s clock forward
*/
public void advanceTimeBy(Duration delayTime) {
advanceTime(delayTime.toNanos());
advanceTimeBy(delayTime, false);
}

/**
* Moves the {@link VirtualTimeScheduler}'s clock forward by a specified amount of time.
*
* @param delayTime the amount of time to move the {@link VirtualTimeScheduler}'s clock forward
* @param defer true to defer the clock move until there are tasks in queue
*/
public void advanceTimeBy(Duration delayTime, boolean defer) {
advanceTime(delayTime.toNanos(), defer);
}

/**
* Moves the {@link VirtualTimeScheduler}'s clock to a particular moment in time.
*
* @param instant the point in time to move the {@link VirtualTimeScheduler}'s
* clock to
* @param instant the point in time to move the {@link VirtualTimeScheduler}'s clock to
*/
public void advanceTimeTo(Instant instant) {
advanceTimeTo(instant, false);
}

/**
* Moves the {@link VirtualTimeScheduler}'s clock to a particular moment in time.
*
* @param instant the point in time to move the {@link VirtualTimeScheduler}'s clock to
* @param defer true to defer the clock move until there are tasks in queue
*/
public void advanceTimeTo(Instant instant, boolean defer) {
long targetTime = TimeUnit.NANOSECONDS.convert(instant.toEpochMilli(),
TimeUnit.MILLISECONDS);
advanceTime(targetTime - nanoTime);
advanceTime(targetTime - nanoTime, defer);
}

/**
Expand Down Expand Up @@ -308,9 +327,11 @@ public Disposable schedulePeriodically(Runnable task,
return periodicTask;
}

final void advanceTime(long timeShiftInNanoseconds) {
final void advanceTime(long timeShiftInNanoseconds, boolean defer) {
Operators.addCap(DEFERRED_NANO_TIME, this, timeShiftInNanoseconds);
drain();
if (!defer || !queue.isEmpty()) {
drain();
}
}

final void drain() {
Expand All @@ -319,27 +340,25 @@ final void drain() {
return;
}
for(;;) {
if (!queue.isEmpty()) {
//resetting for the first time a delayed schedule is called after a deferredNanoTime is set
long targetNanoTime = nanoTime + DEFERRED_NANO_TIME.getAndSet(this, 0);

while (!queue.isEmpty()) {
TimedRunnable current = queue.peek();
if (current == null || current.time > targetNanoTime) {
break;
}
//for the benefit of tasks that call `now()`
// if scheduled time is 0 (immediate) use current virtual time
nanoTime = current.time == 0 ? nanoTime : current.time;
queue.remove();

// Only execute if not unsubscribed
if (!current.scheduler.shutdown) {
current.run.run();
}
//resetting for the first time a delayed schedule is called after a deferredNanoTime is set
long targetNanoTime = nanoTime + DEFERRED_NANO_TIME.getAndSet(this, 0);

while (!queue.isEmpty()) {
TimedRunnable current = queue.peek();
if (current == null || current.time > targetNanoTime) {
break;
}
//for the benefit of tasks that call `now()`
// if scheduled time is 0 (immediate) use current virtual time
nanoTime = current.time == 0 ? nanoTime : current.time;
queue.remove();

// Only execute if not unsubscribed
if (!current.scheduler.shutdown) {
current.run.run();
}
nanoTime = targetNanoTime;
}
nanoTime = targetNanoTime;

remainingWork = ADVANCE_TIME_WIP.addAndGet(this, -remainingWork);
if (remainingWork == 0) {
Expand Down
Expand Up @@ -2131,7 +2131,7 @@ public void parallelVerifyWithVtsMutuallyExclusive() {
}
}

@Test
@Test(timeout = 5000)
public void gh783() {
int size = 1;
Scheduler parallel = Schedulers.newParallel("gh-783");
Expand All @@ -2150,7 +2150,7 @@ public void gh783() {
.verifyComplete();
}

@Test
@Test(timeout = 5000)
public void gh783_deferredAdvanceTime() {
int size = 61;
Scheduler parallel = Schedulers.newParallel("gh-783");
Expand Down
Expand Up @@ -132,7 +132,7 @@ public void captureNowInScheduledTask() {
List<Long> periodicExecutionTimestamps = new ArrayList<>();

try {
vts.advanceTimeBy(Duration.ofMillis(100));
vts.advanceTimeBy(Duration.ofMillis(100), true);

vts.schedule(() -> singleExecutionsTimestamps.add(vts.now(TimeUnit.MILLISECONDS)),
100, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -192,6 +192,10 @@ public void racingAdvanceTimeOnEmptyQueue() {
assertThat(vts.now(TimeUnit.MILLISECONDS))
.as("iteration " + i)
.isEqualTo(13_000 * i);

assertThat(vts.nanoTime)
.as("now() == nanoTime in iteration " + i)
.isEqualTo(vts.now(TimeUnit.NANOSECONDS));
}
}
finally {
Expand Down Expand Up @@ -230,8 +234,8 @@ public void racingAdvanceTimeOnVaryingQueue() {
try {
for (int i = 1; i <= 100; i++) {
RaceTestUtils.race(
() -> vts.advanceTimeBy(Duration.ofSeconds(10)),
() -> vts.advanceTimeBy(Duration.ofSeconds(3)));
() -> vts.advanceTimeBy(Duration.ofSeconds(10), true),
() -> vts.advanceTimeBy(Duration.ofSeconds(3), true));

if (i % 10 == 0) {
vts.schedule(count::incrementAndGet, 14, TimeUnit.SECONDS);
Expand Down