Skip to content

Commit

Permalink
[improve][fn] Run connectors extraction in parallel (apache#17902)
Browse files Browse the repository at this point in the history
  • Loading branch information
aymkhalil committed Oct 17, 2022
1 parent 4b5de98 commit 574f784
Showing 1 changed file with 72 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.utils.io;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.io.IOException;
import java.lang.annotation.Annotation;
Expand All @@ -27,18 +28,25 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.io.ConfigFieldDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.utils.Exceptions;
Expand Down Expand Up @@ -142,49 +150,81 @@ public static TreeMap<String, Connector> searchForConnectors(String connectorsDi
Path path = Paths.get(connectorsDirectory).toAbsolutePath();
log.info("Searching for connectors in {}", path);

TreeMap<String, Connector> connectors = new TreeMap<>();
if (!path.toFile().exists()) {
log.warn("Connectors archive directory not found");
return connectors;
return new TreeMap<>();
}

List<Path> archives = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) {
for (Path archive : stream) {
try {

NarClassLoader ncl = NarClassLoaderBuilder.builder()
.narFile(new File(archive.toString()))
.extractionDirectory(narExtractionDirectory)
.build();

Connector.ConnectorBuilder connectorBuilder = Connector.builder();
ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
log.info("Found connector {} from {}", cntDef, archive);

connectorBuilder.archivePath(archive);
if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
connectorBuilder.sourceConfigFieldDefinitions(ConnectorUtils
.getConnectorConfigDefinition(ncl, cntDef.getSourceConfigClass()));
}
}
archives.add(archive);
}
}
if (archives.isEmpty()) {
return new TreeMap<>();
}

if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
connectorBuilder.sinkConfigFieldDefinitions(
ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
}
}
ExecutorService oneTimeExecutor = null;
try {
int nThreads = Math.min(Runtime.getRuntime().availableProcessors(), archives.size());
log.info("Loading {} connector definitions with a thread pool of size {}", archives.size(), nThreads);
oneTimeExecutor = Executors.newFixedThreadPool(nThreads,
new ThreadFactoryBuilder().setNameFormat("connector-extraction-executor-%d").build());
List<CompletableFuture<Map.Entry<String, Connector>>> futures = new ArrayList<>();
for (Path archive : archives) {
CompletableFuture<Map.Entry<String, Connector>> future = CompletableFuture.supplyAsync(() ->
getConnectorDefinitionEntry(archive, narExtractionDirectory), oneTimeExecutor);
futures.add(future);
}

connectorBuilder.classLoader(ncl);
connectorBuilder.connectorDefinition(cntDef);
connectors.put(cntDef.getName(), connectorBuilder.build());
} catch (Throwable t) {
log.warn("Failed to load connector from {}", archive, t);
FutureUtil.waitForAll(futures).join();
return futures.stream()
.map(CompletableFuture::join)
.filter(entry -> entry != null)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (a, b) -> a, TreeMap::new));
} finally {
if (oneTimeExecutor != null) {
oneTimeExecutor.shutdown();
}
}
}

private static Map.Entry<String, Connector> getConnectorDefinitionEntry(Path archive,
String narExtractionDirectory) {
try {

NarClassLoader ncl = NarClassLoaderBuilder.builder()
.narFile(new File(archive.toString()))
.extractionDirectory(narExtractionDirectory)
.build();

Connector.ConnectorBuilder connectorBuilder = Connector.builder();
ConnectorDefinition cntDef = ConnectorUtils.getConnectorDefinition(ncl);
log.info("Found connector {} from {}", cntDef, archive);

connectorBuilder.archivePath(archive);
if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
if (!StringUtils.isEmpty(cntDef.getSourceConfigClass())) {
connectorBuilder.sourceConfigFieldDefinitions(
ConnectorUtils.getConnectorConfigDefinition(ncl,
cntDef.getSourceConfigClass()));
}
}

return connectors;
if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
if (!StringUtils.isEmpty(cntDef.getSinkConfigClass())) {
connectorBuilder.sinkConfigFieldDefinitions(
ConnectorUtils.getConnectorConfigDefinition(ncl, cntDef.getSinkConfigClass()));
}
}

connectorBuilder.classLoader(ncl);
connectorBuilder.connectorDefinition(cntDef);
return new AbstractMap.SimpleEntry(cntDef.getName(), connectorBuilder.build());
} catch (Throwable t) {
log.warn("Failed to load connector from {}", archive, t);
return null;
}
}
}

0 comments on commit 574f784

Please sign in to comment.