Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow blocking calls in WorkerTask#dispose #3213

Merged
merged 1 commit into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,50 @@ public void shouldDetectBlockingCallsInOperators() {
}

@Test
public void shouldNotReportScheduledFutureTask() {
for (int i = 0; i < 1_000; i++) {
public void shouldNotReportSchedulerScheduledFutureTask() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've adopted this test with learnings from writing the test below.

for (int i = 0; i < 10_000; i++) {
Scheduler taskScheduler = Schedulers.newSingle("foo");
try {
Runnable dummyRunnable = () -> {
};

for (int j = 0; j < 257; j++) {
taskScheduler.schedule(dummyRunnable, 200, TimeUnit.MILLISECONDS);
taskScheduler.schedule(dummyRunnable, j + 1, TimeUnit.MILLISECONDS);
}
Disposable disposable = taskScheduler.schedule(dummyRunnable, 1, TimeUnit.SECONDS);

RaceTestUtils.race(disposable::dispose, disposable::dispose);
RaceTestUtils.race(Schedulers.parallel(), disposable::dispose);
}
finally {
taskScheduler.dispose();
}
}
}

@Test
public void shouldNotReportWorkerScheduledFutureTask() {
for (int i = 0; i < 10_000; i++) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On my local machine 10_000 was reliable in producing the exception -- 1_000 not so much. Let me know if this is acceptable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does not have to be the same for CI env since it increases the general run time. Thus, let's keep it as it was before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow up to this comment, I created #3216 which would create an opportunity to use a more heavy crunching when run locally and do a quicker run-through via CI.

Scheduler scheduler = Schedulers.newSingle("foo");
Scheduler.Worker worker = scheduler.createWorker();

try {
Runnable dummyRunnable = () -> {
};

for (int j = 0; j < 257; j++) {
worker.schedule(dummyRunnable, j + 1, TimeUnit.MILLISECONDS);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the race was between dispose (and consequently, queue#remove) and the queue picking up another task, I found that having alternate scheduling increased the likelihood of hitting the issue.

}
Disposable disposable = worker.schedule(dummyRunnable, 1, TimeUnit.SECONDS);

RaceTestUtils.race(Schedulers.parallel(), disposable::dispose);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RaceTestUtils#race was refactored at some point to default to the bounded elastic scheduler on which blocking is "allowed", making the above test previously moot.

I use the RaceTestUtils even though we're not racing anymore as it's convenient for grabbing/throwing the exceptions.

}
finally {
worker.dispose();
scheduler.dispose();
}
}
}

void expectBlockingCall(String desc, Consumer<CompletableFuture<Object>> callable) {
Assertions
.assertThatThrownBy(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public void applyTo(BlockHound.Builder builder) {

// Calls ScheduledFutureTask#cancel that may short park in DelayedWorkQueue#remove for getting a lock
builder.allowBlockingCallsInside(SchedulerTask.class.getName(), "dispose");
builder.allowBlockingCallsInside(WorkerTask.class.getName(), "dispose");

builder.allowBlockingCallsInside(ThreadPoolExecutor.class.getName(), "processWorkerExit");
}
Expand Down