Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10579: Make Reflections thread safe to resolve flaky NPE scanning failure #14020

Closed

Conversation

gharris1727
Copy link
Contributor

@gharris1727 gharris1727 commented Jul 14, 2023

Summary

The Reflections library has a race condition that causes it to sometimes throw an NPE. This NPE crashes the connect worker on startup, both in live environments and in tests, and causes those tests to be flaky. This PR uses reflection to patch the library and eliminate the race condition. This is done instead of upstreaming the patch or forking the library because the library itself is unmaintained and should be phased out.

Alternatively, we can consider upgrading the library to a version which has patched this bug: #14029

Background

The Reflections library makes use of a data structure Store to store the results of scanning for later querying. The scanner writes to the store during Reflections#scan() via the SubTypesScanner. The store is later queried by Reflections#getSubTypesOf.

Due to the slow speed of reflectively discovering all classes on the classpath and plugin.path, the Reflections library is used with a parallel executor, increasing the scanning speed. Unfortunately the parallel mode of the library has some bugs, one of which have already been patched via the InternalReflections subclass.

The parallel mode causes the Store to receive concurrent writes. The javadoc for the class does not specify that it is or isn't thread-safe, but due to the use of ConcurrentHashMap and the support for parallel scanning in Reflections, the class seems intended to be thread-safe.

Symptoms

The failure appears as the following stack trace.

java.lang.NullPointerException
    at java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
    at org.reflections.Store.getAllIncluding(Store.java:82)
    at org.reflections.Store.getAll(Store.java:93)
    at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:340)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
    at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:61)
    at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)

The stack trace refers to the old location of this code, but the failure persists in it's new location. The line numbers inside of the Reflections library are still accurate, since we haven't upgraded the version of this library in several years.

Diagnosis

From the stacktrace, the NPE is caused by the argument of ConcurrentHashMap#get(String) being null. Tracing backwards, this value is ultimately read from Store#storeMap, meaning it contains a null in the innermost Collection. Also, it contains this null after all of the concurrent scanning has finished (see Reflections#scan where it waits for all of the submitted futures) so there are no writes racing with the reads. The Store#storeMap contains a null at the end of scanning.

Analyzing the data flow into the Store#storeMap, we can see that there is only one method which writes to it: Store#put(String, String, String), where the last argument is added into the innermost collection. This method is called in various places:

  • Reflections#expandSuperTypes always non-null, and access is single-threaded
  • Store#merge (not used) null iff already null in the other Store, and access is single-threaded
  • XmlSerializer#read (not used) null iff null in the XML file, and access is single-threaded
  • AbstractScanner#put always non-null, but the method is called concurrently

Because the argument appears to be non-null on all active code-paths, it doesn't appear that the null could be coming from a caller. Because there are no null values upon entering the method, and there are when exiting, I believe the method itself must be introducing a null. The only interesting property of this method is that it is called concurrently, so there could be a concurrency bug.

Following the hypothesis that this is due to concurrency, I looked for potentially non-thread-safe stuff in this method implementation:

    public boolean put(String index, String key, String value) {
        return storeMap.computeIfAbsent(index, s -> new ConcurrentHashMap<>())
                .computeIfAbsent(key, s -> new ArrayList<>())
                .add(value);
    }

The innermost Collection turns out to be a non-thread-safe ArrayList instance. From the ArrayList javadoc:

* <p><strong>Note that this implementation is not synchronized.</strong>
 * If multiple threads access an {@code ArrayList} instance concurrently,
 * and at least one of the threads modifies the list structurally, it
 * <i>must</i> be synchronized externally.  (A structural modification is
 * any operation that adds or deletes one or more elements, or explicitly
 * resizes the backing array; merely setting the value of an element is not
 * a structural modification.)  This is typically accomplished by
 * synchronizing on some object that naturally encapsulates the list.

With this specific usage pattern, the ArrayList#add() is not synchronized, and structurally modifies the list. Once the ConcurrentHashMap#computeIfAbsent calls complete, there is no synchronization between the different threads operating on the returned Collection instance, and the racing writes can cause unexpected behavior. I found references online which indicate that one symptom of concurrent use of an ArrayList is the appearance of nulls when none were explicitly inserted, the effect we were seeing in the Store#storeMap contents.

Reflective fix

The Store class is not final, so we can subclass it to override behaviors, and then inject the custom store into the existing InternalReflections class used for the existing fix. Unfortunately the Store#storeMap instance variable is private, meaning that we can't simply override the put method. I elected to use reflection to make the field visible and override the single put method, because patching the behavior without reflection required overriding every read method, essentially copying the whole class. If the reflective accesses fail, the InternalStore falls back to the Store behavior, re-introducing the race condition.

Rather than replacing the ArrayList, I chose to use Collections.synchronizedList to make it synchronized. I plan on running some manual performance tests to see how this impacts the scanning time, and we can evaluate changing the collection to something more performant.

Backport

This change is targeted at the ReflectionScanner on trunk, but I can easily re-write it to target the DelegatingLoader on <= 3.5. The bug has been present ever since the scanning was made parallel in #4561 so there are a lot of affected branches that could benefit from this patch, but the severity may not warrant any backporting.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

…ng failure

Signed-off-by: Greg Harris <greg.harris@aiven.io>
@gharris1727
Copy link
Contributor Author

I did n=10 runs before and after, scanning ~150 plugin packages (~3.3G total disk space) downloaded from the internet.

The synchronization appears to add on average 0.7% extra execution time (from 44.69s to 44.99s) and increases the standard deviation from 387ms to 1332ms. I think that this is an inconsequential change in execution time, so this change should be safe to merge from a performance standpoint.

@gharris1727 gharris1727 marked this pull request as ready for review July 17, 2023 17:21
@gharris1727
Copy link
Contributor Author

Closing this in favor of upgrading the library.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
1 participant