Skip to content

Removal listener not called #859

Closed
Closed
@mario-schwede-hivemq

Description

@mario-schwede-hivemq

First, thank you for this really great library!

Now to my issue:

I have the problem that the removal listener is sometimes not executed in low throughput situations.
I use the most recent version 3.1.2. As a workaround, I created a scheduled task that executes cleanUp() every second instead of setting a scheduler.

Here is a reproducer:

package caffeine;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import org.junit.jupiter.api.RepeatedTest;

import java.time.Duration;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.fail;

public class CaffeineReproducer {
    private static final Object VALUE = new Object();
    private static final int NUMBER_OF_RUNS = 100_000;
    private static final int NUMBER_OF_KEYS = 10;
    private static final long WAIT_NANOS = TimeUnit.SECONDS.toNanos(10);

    private final ExecutorService executor = Executors.newCachedThreadPool();
    private final AtomicInteger failedRun = new AtomicInteger(-1);

    @RepeatedTest(10)
    void testRunRemovalListener() throws InterruptedException {
        for (int i = 0; i < NUMBER_OF_RUNS; i++) {
            if (failedRun.get() != -1) {
                if (failedRun.get() == Integer.MIN_VALUE) {
                    fail("Interrupted");
                } else {
                    fail("Removal listener not called on run: " + failedRun.get());
                }
                return;
            }
            final int run = i + 1;
            executor.execute(() -> runTest(run));
            TimeUnit.MILLISECONDS.sleep(1);
        }
    }

    private void runTest(final int run) {
        final CopyOnWriteArrayList<Object> list = new CopyOnWriteArrayList<>();

        final Cache<Object, Object> cache = Caffeine.newBuilder()
                .scheduler(Scheduler.systemScheduler())
                .executor(executor)
                .removalListener((key, value, cause) -> list.add(value))
                .expireAfterWrite(Duration.ofMillis(5))
                .build();

        for (int i = 0; i < NUMBER_OF_KEYS; i++) {
            final int key = i;
            executor.execute(() -> cache.put(key, VALUE));
        }

        final long start = System.nanoTime();
        while (list.size() < NUMBER_OF_KEYS) {
            if ((System.nanoTime() - start) >= WAIT_NANOS) {
                failedRun.set(run);
                return;
            }
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (final InterruptedException e) {
                failedRun.set(Integer.MIN_VALUE);
                return;
            }
        }
    }
}

Activity

ben-manes

ben-manes commented on Jan 12, 2023

@ben-manes
Owner

Hi @mario-schwede-hivemq ,

When I add a println to your test's removal listener then I see it being called, but the test fails claiming no calls were made.

.removalListener((key, value, cause) -> {
  System.out.printf("#%s - %s: %s=%s%n", run, cause, key, value);
  list.add(value);
})
#23 - EXPIRED: 0=java.lang.Object@6688f5b6
#31 - EXPIRED: 0=java.lang.Object@6688f5b6
#27 - EXPIRED: 0=java.lang.Object@6688f5b6
#8 - EXPIRED: 1=java.lang.Object@6688f5b6
#8 - EXPIRED: 2=java.lang.Object@6688f5b6
org.opentest4j.AssertionFailedError: Removal listener not called on run: 27
	at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
	at org.junit.jupiter.api.Assertions.fail(Assertions.java:135)
	at CaffeineReproducer.testRunRemovalListener(CaffeineReproducer.java:32)
ben-manes

ben-manes commented on Jan 12, 2023

@ben-manes
Owner

An issue with the test is that it does not hold a strong reference to the cache. The scheduled cleanup task holds a weak reference to it, so that if it outlives the cache's use then the task can no-op and the cache garbage collected. It doesn't make much sense to have a cache that the application discarded but pending expiration keeps it around, a proper shut down or long-lived instance would be more reasonable application logic. So that seems to be halting the accumulation and explain part of it, at least.

mario-schwede-hivemq

mario-schwede-hivemq commented on Jan 12, 2023

@mario-schwede-hivemq
Author

Thank you for your investigation, and you are right. The reproducer has the problem, that the cache can be garbage collected. I fixed that issue and now the test is green.
This left me a bit confused, because in the real life code, where I observed this behavior, the cache is an instance variable and can not be collected. And of course most of the time it works as expected and the removal listener is called.
I have to dig into that code again. Maybe I overlooked something.

ben-manes

ben-manes commented on Jan 12, 2023

@ben-manes
Owner

When I rewrite your test to capture all of the run state and use a CountDownLatch to track completion, I see cases where the entries are in the data map, the expiration policy's data structures are empty as not yet updated, there is no scheduled future as nothing to expire, and those pending additions are waiting in the write buffer. Once another cache operation occurs, like a read or write, then the write buffer will be flushed and everything quiesces to the desired state.

As scheduling is documented as best-effort, from that perspective this is acceptable as there will be benign orchestration races which do not result in invalid state or operations. However if you are relying in strict timing events for business logic and otherwise leave the cache idle, then the divergences of goals could cause you problems. In that case it might make sense to use a stricter timing subsystem rather than rely on the cache's fuzziness. There may be some tweaks that we can do a bit better, but I am not sure yet if that would fully resolve this scenario or simply make it less likely.

In a cache every read is a write to update shared state, such as reorder items on an LRU list. This is why traditional caches do not scale well because every access requires obtaining an exclusive lock to update the policy metadata. The approach that Caffeine uses is to update the ConcurrentHashMap immediately, stage these events into ring buffers, and schedule a non-blocking task to replay these events onto the policy. This way we can absorb concurrent reads and writes without blocking or contention, guard the policy with a lock to use the best algorithms, and quiesce to the desired outcome.

This orchestration means there can be races. Here the maintenance work is running, flushed the write buffer, and is busy expiring all of the entries that the policy is aware of. A concurrent write adds the data entry and write event, updates a flag, but does not schedule a new maintenance task as one is running. When the maintenance finishes, there is nothing to schedule but new pending work when it releases the lock. If you use the default ForkJoinPool then we can safely schedule a new task immediately and the test passes without any hiccups. However, if you use a custom executor then we might over penalize it, e.g. if a caller-runs policy, so don't know if it is safe / acceptable / friendly to reschedule immediately. The cache's own logic is very fast, but when we make calls to listeners we don't know the cost of this foreign, user code. In your test since the cache is then idle, nothing triggers the next maintenance run and the listener is not notified yet.

Does that make sense? Do you have any suggestions for how this could be improved, etc?

Optimistic rescheduling

/**
* Performs the maintenance work, blocking until the lock is acquired.
*
* @param task an additional pending task to run, or {@code null} if not present
*/
void performCleanUp(@Nullable Runnable task) {
evictionLock.lock();
try {
maintenance(task);
} finally {
evictionLock.unlock();
}
if ((drainStatusOpaque() == REQUIRED) && (executor == ForkJoinPool.commonPool())) {
scheduleDrainBuffers();
}
}

Test Screen Shot 2023-01-12 at 12 19 09 PM
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.RepeatedTest;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;

public class CaffeineReproducer {
  private static final Object VALUE = new Object();
  private static final int NUMBER_OF_RUNS = 100_000;
  private static final int NUMBER_OF_KEYS = 10;

  private final ExecutorService cacheExecutor = Executors.newFixedThreadPool(200);
  private final ExecutorService testExecutor = Executors.newCachedThreadPool();
  private final ConcurrentHashMap<Integer, TestRun> runs = new ConcurrentHashMap<>();

  @RepeatedTest(1)
  void testRunRemovalListener() throws InterruptedException {
    System.out.printf("%n%nSTART%n%n");

    for (int i = 1; i <= NUMBER_OF_RUNS; i++) {
      runs.put(i, runTest(i));
    }
    Thread.currentThread().setName(getClass().getSimpleName());
    for (int i = 1; i <= NUMBER_OF_RUNS; i++) {
      System.out.printf("#%s - WAIT%n", i);
      var run = runs.get(i);
      boolean finished = run.latch.await(5, TimeUnit.SECONDS);
      if (!finished) {
        System.out.printf("debug for inspection");
        run.cache.cleanUp();
      }
    }
    System.out.printf("%n%nEND%n%n");
  }

  private TestRun runTest(final int run) {
    var list = Collections.synchronizedList(new ArrayList<>());
    var latch = new CountDownLatch(NUMBER_OF_KEYS);

    Cache<Object, Object> cache = Caffeine.newBuilder()
        .removalListener((key, value, cause) -> {
          //System.out.printf("#%s - %s: %s=%s%n", run, cause, key, value);
          latch.countDown();
          list.add(value);
        })
        .expireAfterWrite(Duration.ofMillis(5))
        .scheduler(Scheduler.systemScheduler())
        .executor(cacheExecutor)
        .build();
    for (int i = 0; i < NUMBER_OF_KEYS; i++) {
      var key = i;
      testExecutor.execute(() -> cache.put(key, VALUE));
    }
    return new TestRun(cache, list, latch);
  }

  class TestRun {
    Cache<Object, Object> cache;
    List<Object> notifications;
    CountDownLatch latch;

    TestRun(Cache<Object, Object> cache, List<Object> notifications, CountDownLatch latch) {
      this.notifications = notifications;
      this.cache = cache;
      this.latch = latch;
    }
  }
}
ben-manes

ben-manes commented on Jan 13, 2023

@ben-manes
Owner

I think that the optimistic rescheduling could use the provided scheduler safely because we know that it should perform the work sometime in the future and not simply loop onto the calling thread immediately. The code change would be something like the following, which passes your test.

if (drainStatusOpaque() == REQUIRED) {
  if (executor == ForkJoinPool.commonPool()) {
    scheduleDrainBuffers();
    return;
  }

  var pacer = pacer();
  if ((pacer == null) || (pacer.future != null)) {
    return;
  }
  synchronized (pacer) {
    if (pacer.future == null) {
      pacer.schedule(executor, drainBuffersTask,
          expirationTicker().read(), Pacer.TOLERANCE);
    }
  }
}
mario-schwede-hivemq

mario-schwede-hivemq commented on Jan 13, 2023

@mario-schwede-hivemq
Author

Good that you found the possible cause of the problem and also provided a proposal for a fix. Your explanation and the updated test makes total sense.

The commonPool is, in my case, not an option, because it is shared JVM wide and I have no control (and insights) over other usages.

Your finding also shows me that my workaround does not work. It just changes the cleanup timing a little:
executor.scheduleWithFixedDelay(cache::cleanUp, 1, 1, TimeUnit.SECONDS)

I thought a little about what you wrote according to business logic in the callbacks. I totally agree that there can not be any assumptions about the timing when a callback will be called. But it should be at least be called. This is not only important for business logic, but also in other cases, like cleanup resources.

I also agree that the most common case is that there is ongoing interaction with the cache, but it can always happen that, for whatever reason, the interaction stops. In this case, nothing should hang around in the cache anymore when a scheduler is configured.

I really appreciate your time on this and your detailed explanations.

ben-manes

ben-manes commented on Jan 13, 2023

@ben-manes
Owner

Your finding also shows me that my workaround does not work. It just changes the cleanup timing a little

Your workaround is fine because it resolves it continuously. My fix would do the same, except by inspecting the internals it can schedule only when necessary instead of (blindly) periodically.

I totally agree that there can not be any assumptions about the timing when a callback will be called. But it should be at least be called.

Right and the listener is always called, just that various tradeoffs results in there being no strict guarantee about when. That can cause an unexpected delay like in your findings, but not a missed invocation.

In this case, nothing should hang around in the cache anymore when a scheduler is configured.

I agree. Hopefully, I'll get a chance to work on the unit tests this weekend and release a fix with our proposed changes.

Thanks for the bug report, reproducer, and patience!

added 2 commits that reference this issue on Jan 16, 2023
463afd3
18133a7
ben-manes

ben-manes commented on Feb 10, 2023

@ben-manes
Owner

Released in 3.1.3

sagansfault

sagansfault commented on Apr 18, 2024

@sagansfault

Will this change not be implemented for pre v3 (java 8) Caffeine?

ben-manes

ben-manes commented on Apr 18, 2024

@ben-manes
Owner

Correct, it’s not planned for 2.x. This is a quality of life improvement that is non-critical and the old behavior was allowed by the api contract. Java 8 reached EOL for public security fixes so those users are already accepting a very limited amount of support and have a high risk tolerance.

3 remaining items

Loading
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

      Development

      No branches or pull requests

        Participants

        @ben-manes@sagansfault@mario-schwede-hivemq

        Issue actions

          Removal listener not called · Issue #859 · ben-manes/caffeine