Skip to content

Commit

Permalink
KAFKA-15162: Reflectively find plugins in parent ClassLoaders that ar…
Browse files Browse the repository at this point in the history
…en't on the classpath (apache#13977)

Signed-off-by: Greg Harris <greg.harris@aiven.io>
Reviewed-by: Chris Egerton <chris.egerton@aiven.io>
  • Loading branch information
gharris1727 authored and jeqo committed Jul 21, 2023
1 parent dd6a8ac commit de7431b
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,10 @@ public static Set<PluginSource> pluginSources(List<Path> pluginLocations, ClassL
log.error("Could not get listing for plugin path: {}. Ignoring.", pluginLocation, e);
}
}
URL[] classpathUrls = ClasspathHelper.forJavaClassPath().toArray(new URL[0]);
pluginSources.add(new PluginSource(null, classLoader.getParent(), classpathUrls));
List<URL> parentUrls = new ArrayList<>();
parentUrls.addAll(ClasspathHelper.forJavaClassPath());
parentUrls.addAll(ClasspathHelper.forClassLoader(classLoader.getParent()));
pluginSources.add(new PluginSource(null, classLoader.getParent(), parentUrls.toArray(new URL[0])));
return pluginSources;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ public void subclassedServiceLoadedPluginShouldNotAppearIsolated() {
}

private void assertClassLoaderReadsVersionFromResource(
TestPlugin parentResource, TestPlugin childResource, String className, String... expectedVersions) throws MalformedURLException {
TestPlugin parentResource, TestPlugin childResource, String className, String... expectedVersions) {
URL[] systemPath = TestPlugins.pluginPath(parentResource)
.stream()
.map(Path::toFile)
Expand All @@ -500,6 +500,11 @@ private void assertClassLoaderReadsVersionFromResource(
);
plugins = new Plugins(pluginProps, parent, new ClassLoaderFactory());

assertTrue("Should find plugin in plugin classloader",
plugins.converters().stream().anyMatch(desc -> desc.loader() instanceof PluginClassLoader));
assertTrue("Should find plugin in parent classloader",
plugins.converters().stream().anyMatch(desc -> parent.equals(desc.loader())));

Converter converter = plugins.newPlugin(
className,
new AbstractConfig(new ConfigDef(), Collections.emptyMap()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.components.Versioned;

/**
* Fake plugin class for testing classloading isolation
Expand All @@ -39,7 +40,7 @@
* Exfiltrates data via {@link ReadVersionFromResource#fromConnectData(String, Schema, Object)}
* and {@link ReadVersionFromResource#toConnectData(String, byte[])}.
*/
public class ReadVersionFromResource implements Converter {
public class ReadVersionFromResource implements Converter, Versioned {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {

Expand Down Expand Up @@ -78,4 +79,13 @@ public SchemaAndValue toConnectData(final String topic, final byte[] value) {
throw new AssertionError(e);
}
}

@Override
public String version() {
try (InputStream stream = this.getClass().getResourceAsStream("/version")) {
return version(stream);
} catch (IOException e) {
throw new AssertionError(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.components.Versioned;

/**
* Fake plugin class for testing classloading isolation.
Expand All @@ -39,7 +40,7 @@
* Exfiltrates data via {@link ReadVersionFromResource#fromConnectData(String, Schema, Object)}
* and {@link ReadVersionFromResource#toConnectData(String, byte[])}.
*/
public class ReadVersionFromResource implements Converter {
public class ReadVersionFromResource implements Converter, Versioned {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {

Expand Down Expand Up @@ -78,4 +79,13 @@ public SchemaAndValue toConnectData(final String topic, final byte[] value) {
throw new AssertionError(e);
}
}

@Override
public String version() {
try (InputStream stream = this.getClass().getResourceAsStream("/version")) {
return version(stream);
} catch (IOException e) {
throw new AssertionError(e);
}
}
}

0 comments on commit de7431b

Please sign in to comment.