Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BoundedElasticScheduler does not invoke any tasks after interruption. #1973

Closed
jacekmg opened this issue Nov 25, 2019 · 3 comments · Fixed by #2040 or #2045
Closed

BoundedElasticScheduler does not invoke any tasks after interruption. #1973

jacekmg opened this issue Nov 25, 2019 · 3 comments · Fixed by #2040 or #2045
Assignees
Labels
type/bug A general bug

Comments

@jacekmg
Copy link

jacekmg commented Nov 25, 2019

I've stumbled upon a problem with BoundedElasticScheduler when there's high traffic creating many queued task on it. Invoking dispose() on scheduled tasks seems to break scheduler, to the point where it is not invoking any new tasks.

Expected Behavior

Scheduler should correctly invoke tasks, after disposing all of its other tasks.

Actual Behavior

Scheduler doesn't do anything after disposing all of its tasks, and scheduling a new one.

Steps to Reproduce

In the following example, behavior is as follows:

  • ~520 tasks are scheduled
  • ~300 scheduled tasks are processed correctly
  • When dispose() is called all of scheduled tasks are cancelled
  • On most scheduled tasks InterruptedException occurs, except ~15 last ones (505-520)
  • Scheduled task after dispose() call is never invoked.
Logger logger = LoggerFactory.getLogger("Test");

    @Test
    public void test() throws InterruptedException {
        Scheduler scheduler =
                Schedulers.newBoundedElastic(30, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "test-scheduler");
        Disposable subscription = Flux.range(0, 100000000)
                .flatMap(i -> {
                    logger.info("Scheduling {}", i);
                    return Mono.fromSupplier(() -> invokeLongOperation(i))
                            .subscribeOn(scheduler)
                            .publishOn(Schedulers.parallel())
                            .doOnError(throwable -> logger.error("Error occurred in thread {} ", i, throwable))
                            .doOnCancel(() -> logger.info("Thread {} cancelled", i));
                })
                .subscribe(i -> logger.info("Thread {} finished", i),
                           throwable -> logger.error("Error ", throwable));
        Thread.sleep(10000);
        logger.info("Invoking dispose");
        subscription.dispose();
        Thread.sleep(1000);
        Mono.fromSupplier(() -> invokeLongOperation(999999999))
                .subscribeOn(scheduler)
                .publishOn(Schedulers.parallel())
                .subscribe((i) -> logger.info("Consumed value {} ", i),
                           throwable -> logger.error("Exception"));
        logger.info("Sleeping");
        Thread.sleep(100000);
    }

    private int invokeLongOperation(Integer i) {
        logger.info("Invoking long operation {}", i);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            logger.error("Thread {} interrupted", i);
        }
        return i;
    }

##Environment

  • Reactor version used: 3.3.0
  • JVM version: 1.8.0
@jacekmg jacekmg changed the title BoundedElasticScheduler do not invoke any tasks after interruption. BoundedElasticScheduler does not invoke any tasks after interruption. Nov 25, 2019
@simonbasle simonbasle self-assigned this Nov 25, 2019
@simonbasle simonbasle added the status/need-investigation This needs more in-depth investigation label Nov 25, 2019
@simonbasle
Copy link
Member

Hi @jacekmg and thanks for the report. I was unable to reproduce the issue, that last task with i=999999999 seems to always correctly end up executing on my machine.

Note that since the disposal of tasks inside the flatMap is sequential, and that each disposal of an active Worker in the BoundedElasticScheduler leads to it attempting to reuse the underlying thread for the next queued-due-to-thread-cap task, a number of tasks that have been scheduled but were enqueued by the time subscription.dispose() is invoked will still briefly start running then be cancelled (ie. interrupted).

(As a side note, the unit test was not entirely useful because it relies on logging rather than capturing state (eg. in AtomicInteger or the like) and asserting said state at the end. The test as written is always green, which didn't help me check whether or not things were behaving as expected)

@simonbasle simonbasle added for/user-attention This issue needs user attention (feedback, rework, etc...) status/cannot-reproduce We cannot reproduce this issue. A repro case would help. and removed status/need-investigation This needs more in-depth investigation labels Nov 25, 2019
@LSwiatek
Copy link

LSwiatek commented Jan 9, 2020

I've experienced the same issue

on first glance it looks like race condition when disposing ActiveWroker/DefferedWorker, which results in worker threads not beeing released to idle pool when cancel is called on scheduled tasks

image

my test case :

import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Elastic {

    private static final Logger LOG = LoggerFactory.getLogger(Elastic.class);
    private Scheduler scheduler;
    private LinkedList<MonoSink<String>> listeners;
    private ExecutorService producer;

    @BeforeEach
    void setUp() {
        scheduler = Schedulers.newBoundedElastic(3, 100000, "subscriberElastic", 600, true);
        listeners = new LinkedList<>();
        startProducer();
    }

    @AfterEach
    void tearDown() {
        producer.shutdown();
    }

    @Test
    void stuck() throws InterruptedException {
        List<Disposable> scheduled = new LinkedList<>();

        for (int i = 0; i < 50; i++) {
            scheduled.add(
                    Mono.create(this::addListener)
                            .subscribeOn(scheduler)
                            .subscribe(LOG::info)
            );
        }

        Thread.sleep(1000);
        scheduled.forEach(Disposable::dispose);
        Thread.sleep(1000);

        String result = Mono.create(this::addListener)
                .subscribeOn(scheduler)
                .map(res -> res + " the end")
                .block(Duration.ofSeconds(5));
        LOG.info(result);
        Assert.assertNotNull(result);

    }

    private void startProducer() {
        producer = Executors.newSingleThreadExecutor();
        producer.submit(() -> {
            int i = 0;
            while (true) {
                MonoSink<String> sink = listeners.poll();
                if (sink != null) {
                    sink.success(Integer.toString(i++));
                }

                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    LOG.info("Producer stopping");
                    return;
                }
            }
        });
    }

    public void addListener(MonoSink<String> sink) {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        listeners.add(sink);
        sink.onDispose(() -> removeListener(sink));
    }

    public void removeListener(MonoSink<String> sink) {
        listeners.remove(sink);
    }
}

on my machine it always ends with
java.lang.IllegalStateException: Timeout on blocking read for 5000 MILLISECONDS
you may tweek number of threads and tasks if needed

@simonbasle simonbasle added type/bug A general bug and removed for/user-attention This issue needs user attention (feedback, rework, etc...) status/cannot-reproduce We cannot reproduce this issue. A repro case would help. labels Feb 12, 2020
simonbasle added a commit that referenced this issue Feb 14, 2020
When the active worker becomes inactive and picks a deferred worker
to serve, but at the same time that deferred worker is cancelled or
disposed, the later can be gone from the parent's pool (due to the
former polling).

The issue is that the decrementing of REMAINING_TASK was guarded
by a successful removal of the deferred worker from the parent,
which in the race is already removed and thus returns false.

The decrement must be done unconditionally at this step to fix
the issue.
@simonbasle
Copy link
Member

also verified to be fixed by the new implementation in #2040

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment