Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10579: Make Reflections thread safe to resolve flaky NPE scanning failure #14020

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,23 @@
import org.reflections.Configuration;
import org.reflections.Reflections;
import org.reflections.ReflectionsException;
import org.reflections.Store;
import org.reflections.scanners.SubTypesScanner;
import org.reflections.util.ConfigurationBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Field;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;

/**
* A {@link PluginScanner} implementation which uses reflection and {@link ServiceLoader} to discover plugins.
Expand Down Expand Up @@ -159,5 +165,61 @@ protected void scan(URL url) {
}
}
}

@Override
protected void scan() {
// The super constructor instantiates a store to be used in this method.
// Replace the store with a custom subclass, then call the normal scan to use the custom store.
store = new InternalStore();
super.scan();
}
}

private static class InternalStore extends Store {

private static final Field STORE_MAP_FIELD = storeMapField();
private final ConcurrentHashMap<String, Map<String, Collection<String>>> storeMap;

private static Field storeMapField() {
try {
Field field = Store.class.getDeclaredField("storeMap");
field.setAccessible(true);
return field;
} catch (Throwable e) {
log.error("Unable to access org.reflections.Store#storeMap, falling back to default behavior", e);
return null;
}
}

public InternalStore() {
super();
storeMap = storeMap();
}

@SuppressWarnings("unchecked")
private ConcurrentHashMap<String, Map<String, Collection<String>>> storeMap() {
if (STORE_MAP_FIELD == null) {
return null;
}
try {
return (ConcurrentHashMap<String, Map<String, Collection<String>>>) STORE_MAP_FIELD.get(this);
} catch (Throwable e) {
log.error("Unable to access org.reflections.Store#storeMap, falling back to default behavior", e);
return null;
}
}

@Override
public boolean put(String index, String key, String value) {
if (storeMap != null) {
// When Reflections is used for parallel scans, it has a bug where concurrent calls to add() cause
// nulls to appear in the non-thread-safe ArrayList. Override in order to insert synchronized ArrayLists.
return storeMap.computeIfAbsent(index, s -> new ConcurrentHashMap<>())
.computeIfAbsent(key, s -> Collections.synchronizedList(new ArrayList<>()))
.add(value);
} else {
return super.put(index, key, value);
}
}
}
}