Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into lucene_snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
elasticsearchmachine committed Apr 27, 2024
2 parents c4d7716 + 4664ced commit 1799b7e
Show file tree
Hide file tree
Showing 144 changed files with 3,828 additions and 1,088 deletions.
Expand Up @@ -72,13 +72,18 @@ SortedMap<Integer, String> invalidLines() {
* @param args the start-up arguments
* @param processInfo information about the CLI process.
* @param tmpDir the directory that should be passed to {@code -Djava.io.tmpdir}
* @param machineDependentHeap the heap configurator to use
* @return the list of options to put on the Java command line
* @throws InterruptedException if the java subprocess is interrupted
* @throws IOException if there is a problem reading any of the files
* @throws UserException if there is a problem parsing the `jvm.options` file or `jvm.options.d` files
*/
public static List<String> determineJvmOptions(ServerArgs args, ProcessInfo processInfo, Path tmpDir) throws InterruptedException,
IOException, UserException {
public static List<String> determineJvmOptions(
ServerArgs args,
ProcessInfo processInfo,
Path tmpDir,
MachineDependentHeap machineDependentHeap
) throws InterruptedException, IOException, UserException {
final JvmOptionsParser parser = new JvmOptionsParser();

final Map<String, String> substitutions = new HashMap<>();
Expand All @@ -89,7 +94,7 @@ public static List<String> determineJvmOptions(ServerArgs args, ProcessInfo proc

try {
return Collections.unmodifiableList(
parser.jvmOptions(args, args.configDir(), tmpDir, envOptions, substitutions, processInfo.sysprops())
parser.jvmOptions(args, args.configDir(), tmpDir, envOptions, substitutions, processInfo.sysprops(), machineDependentHeap)
);
} catch (final JvmOptionsFileParserException e) {
final String errorMessage = String.format(
Expand Down Expand Up @@ -125,7 +130,8 @@ private List<String> jvmOptions(
Path tmpDir,
final String esJavaOpts,
final Map<String, String> substitutions,
final Map<String, String> cliSysprops
final Map<String, String> cliSysprops,
final MachineDependentHeap machineDependentHeap
) throws InterruptedException, IOException, JvmOptionsFileParserException, UserException {

final List<String> jvmOptions = readJvmOptionsFiles(config);
Expand All @@ -135,10 +141,8 @@ private List<String> jvmOptions(
}

final List<String> substitutedJvmOptions = substitutePlaceholders(jvmOptions, Collections.unmodifiableMap(substitutions));
final MachineDependentHeap machineDependentHeap = new MachineDependentHeap(
new OverridableSystemMemoryInfo(substitutedJvmOptions, new DefaultSystemMemoryInfo())
);
substitutedJvmOptions.addAll(machineDependentHeap.determineHeapSettings(config, substitutedJvmOptions));
final SystemMemoryInfo memoryInfo = new OverridableSystemMemoryInfo(substitutedJvmOptions, new DefaultSystemMemoryInfo());
substitutedJvmOptions.addAll(machineDependentHeap.determineHeapSettings(args.nodeSettings(), memoryInfo, substitutedJvmOptions));
final List<String> ergonomicJvmOptions = JvmErgonomics.choose(substitutedJvmOptions, args.nodeSettings());
final List<String> systemJvmOptions = SystemJvmOptions.systemJvmOptions(args.nodeSettings(), cliSysprops);

Expand Down
Expand Up @@ -8,193 +8,153 @@

package org.elasticsearch.server.cli;

import org.elasticsearch.common.ParsingException;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.yaml.YamlXContent;
import org.elasticsearch.node.NodeRoleSettings;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static java.lang.Math.max;
import static java.lang.Math.min;
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.MASTER_ROLE;
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.ML_ROLE;
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE;
import static org.elasticsearch.server.cli.JvmOption.isInitialHeapSpecified;
import static org.elasticsearch.server.cli.JvmOption.isMaxHeapSpecified;
import static org.elasticsearch.server.cli.JvmOption.isMinHeapSpecified;

/**
* Determines optimal default heap settings based on available system memory and assigned node roles.
*/
public final class MachineDependentHeap {
public class MachineDependentHeap {
private static final long GB = 1024L * 1024L * 1024L; // 1GB
private static final long MAX_HEAP_SIZE = GB * 31; // 31GB
private static final long MIN_HEAP_SIZE = 1024 * 1024 * 128; // 128MB
private static final int DEFAULT_HEAP_SIZE_MB = 1024;
private static final String ELASTICSEARCH_YML = "elasticsearch.yml";

private final SystemMemoryInfo systemMemoryInfo;

public MachineDependentHeap(SystemMemoryInfo systemMemoryInfo) {
this.systemMemoryInfo = systemMemoryInfo;
}
public MachineDependentHeap() {}

/**
* Calculate heap options.
*
* @param configDir path to config directory
* @param nodeSettings the settings for the node
* @param userDefinedJvmOptions JVM arguments provided by the user
* @return final heap options, or an empty collection if user provided heap options are to be used
* @throws IOException if unable to load elasticsearch.yml
*/
public List<String> determineHeapSettings(Path configDir, List<String> userDefinedJvmOptions) throws IOException, InterruptedException {
public final List<String> determineHeapSettings(
Settings nodeSettings,
SystemMemoryInfo systemMemoryInfo,
List<String> userDefinedJvmOptions
) throws IOException, InterruptedException {
// TODO: this could be more efficient, to only parse final options once
final Map<String, JvmOption> finalJvmOptions = JvmOption.findFinalOptions(userDefinedJvmOptions);
if (isMaxHeapSpecified(finalJvmOptions) || isMinHeapSpecified(finalJvmOptions) || isInitialHeapSpecified(finalJvmOptions)) {
// User has explicitly set memory settings so we use those
return Collections.emptyList();
}

Path config = configDir.resolve(ELASTICSEARCH_YML);
try (InputStream in = Files.newInputStream(config)) {
return determineHeapSettings(in);
}
}

List<String> determineHeapSettings(InputStream config) {
MachineNodeRole nodeRole = NodeRoleParser.parse(config);
List<DiscoveryNodeRole> roles = NodeRoleSettings.NODE_ROLES_SETTING.get(nodeSettings);
long availableSystemMemory = systemMemoryInfo.availableSystemMemory();
return options(nodeRole.heap(availableSystemMemory));
MachineNodeRole nodeRole = mapNodeRole(roles);
return options(getHeapSizeMb(nodeSettings, nodeRole, availableSystemMemory));
}

private static List<String> options(int heapSize) {
return List.of("-Xms" + heapSize + "m", "-Xmx" + heapSize + "m");
}

/**
* Parses role information from elasticsearch.yml and determines machine node role.
*/
static class NodeRoleParser {

@SuppressWarnings("unchecked")
public static MachineNodeRole parse(InputStream config) {
final Settings settings;
try (var parser = YamlXContent.yamlXContent.createParser(XContentParserConfiguration.EMPTY, config)) {
if (parser.currentToken() == null && parser.nextToken() == null) {
settings = null;
protected int getHeapSizeMb(Settings nodeSettings, MachineNodeRole role, long availableMemory) {
return switch (role) {
/*
* Master-only node.
*
* <p>Heap is computed as 60% of total system memory up to a maximum of 31 gigabytes.
*/
case MASTER_ONLY -> mb(min((long) (availableMemory * .6), MAX_HEAP_SIZE));
/*
* Machine learning only node.
*
* <p>Heap is computed as:
* <ul>
* <li>40% of total system memory when total system memory 16 gigabytes or less.</li>
* <li>40% of the first 16 gigabytes plus 10% of memory above that when total system memory is more than 16 gigabytes.</li>
* <li>The absolute maximum heap size is 31 gigabytes.</li>
* </ul>
*
* In all cases the result is rounded down to the next whole multiple of 4 megabytes.
* The reason for doing this is that Java will round requested heap sizes to a multiple
* of 4 megabytes (certainly versions 11 to 18 do this), so by doing this ourselves we
* are more likely to actually get the amount we request. This is worthwhile for ML where
* the ML autoscaling code needs to be able to calculate the JVM size for different sizes
* of ML node, and if Java is also rounding then this causes a discrepancy. It's possible
* that a future version of Java could round to an even bigger number of megabytes, which
* would cause a discrepancy for people using that version of Java. But there's no harm
* in a bit of extra rounding here - it can only reduce discrepancies.
*
* If this formula is changed then corresponding changes must be made to the {@code NativeMemoryCalculator} and
* {@code MlAutoscalingDeciderServiceTests} classes in the ML plugin code. Failure to keep the logic synchronized
* could result in repeated autoscaling up and down.
*/
case ML_ONLY -> {
if (availableMemory <= (GB * 16)) {
yield mb((long) (availableMemory * .4), 4);
} else {
settings = Settings.fromXContent(parser);
yield mb((long) min((GB * 16) * .4 + (availableMemory - GB * 16) * .1, MAX_HEAP_SIZE), 4);
}
} catch (IOException | ParsingException ex) {
// Strangely formatted config, so just return defaults and let startup settings validation catch the problem
return MachineNodeRole.UNKNOWN;
}

if (settings != null && settings.isEmpty() == false) {
List<String> roles = settings.getAsList("node.roles");

if (roles.isEmpty()) {
// If roles are missing or empty (coordinating node) assume defaults and consider this a data node
return MachineNodeRole.DATA;
} else if (containsOnly(roles, "master")) {
return MachineNodeRole.MASTER_ONLY;
} else if (roles.contains("ml") && containsOnly(roles, "ml", "remote_cluster_client")) {
return MachineNodeRole.ML_ONLY;
/*
* Data node. Essentially any node that isn't a master or ML only node.
*
* <p>Heap is computed as:
* <ul>
* <li>40% of total system memory when less than 1 gigabyte with a minimum of 128 megabytes.</li>
* <li>50% of total system memory when greater than 1 gigabyte up to a maximum of 31 gigabytes.</li>
* </ul>
*/
case DATA -> {
if (availableMemory < GB) {
yield mb(max((long) (availableMemory * .4), MIN_HEAP_SIZE));
} else {
return MachineNodeRole.DATA;
yield mb(min((long) (availableMemory * .5), MAX_HEAP_SIZE));
}
} else { // if the config is completely empty, then assume defaults and consider this a data node
return MachineNodeRole.DATA;
}
}

@SuppressWarnings("unchecked")
private static <T> boolean containsOnly(Collection<T> collection, T... items) {
return Arrays.asList(items).containsAll(collection);
}
};
}

enum MachineNodeRole {
/**
* Master-only node.
*
* <p>Heap is computed as 60% of total system memory up to a maximum of 31 gigabytes.
*/
MASTER_ONLY(m -> mb(min((long) (m * .6), MAX_HEAP_SIZE))),

/**
* Machine learning only node.
*
* <p>Heap is computed as:
* <ul>
* <li>40% of total system memory when total system memory 16 gigabytes or less.</li>
* <li>40% of the first 16 gigabytes plus 10% of memory above that when total system memory is more than 16 gigabytes.</li>
* <li>The absolute maximum heap size is 31 gigabytes.</li>
* </ul>
*
* In all cases the result is rounded down to the next whole multiple of 4 megabytes.
* The reason for doing this is that Java will round requested heap sizes to a multiple
* of 4 megabytes (certainly versions 11 to 18 do this), so by doing this ourselves we
* are more likely to actually get the amount we request. This is worthwhile for ML where
* the ML autoscaling code needs to be able to calculate the JVM size for different sizes
* of ML node, and if Java is also rounding then this causes a discrepancy. It's possible
* that a future version of Java could round to an even bigger number of megabytes, which
* would cause a discrepancy for people using that version of Java. But there's no harm
* in a bit of extra rounding here - it can only reduce discrepancies.
*
* If this formula is changed then corresponding changes must be made to the {@code NativeMemoryCalculator} and
* {@code MlAutoscalingDeciderServiceTests} classes in the ML plugin code. Failure to keep the logic synchronized
* could result in repeated autoscaling up and down.
*/
ML_ONLY(m -> mb(m <= (GB * 16) ? (long) (m * .4) : (long) min((GB * 16) * .4 + (m - GB * 16) * .1, MAX_HEAP_SIZE), 4)),

/**
* Data node. Essentially any node that isn't a master or ML only node.
*
* <p>Heap is computed as:
* <ul>
* <li>40% of total system memory when less than 1 gigabyte with a minimum of 128 megabytes.</li>
* <li>50% of total system memory when greater than 1 gigabyte up to a maximum of 31 gigabytes.</li>
* </ul>
*/
DATA(m -> mb(m < GB ? max((long) (m * .4), MIN_HEAP_SIZE) : min((long) (m * .5), MAX_HEAP_SIZE))),

/**
* Unknown role node.
*
* <p>Hard-code heap to a default of 1 gigabyte.
*/
UNKNOWN(m -> DEFAULT_HEAP_SIZE_MB);
protected static int mb(long bytes) {
return (int) (bytes / (1024 * 1024));
}

private final Function<Long, Integer> formula;
protected static int mb(long bytes, int toLowerMultipleOfMb) {
return toLowerMultipleOfMb * (int) (bytes / (1024 * 1024 * toLowerMultipleOfMb));
}

MachineNodeRole(Function<Long, Integer> formula) {
this.formula = formula;
private static MachineNodeRole mapNodeRole(List<DiscoveryNodeRole> roles) {
if (roles.isEmpty()) {
// If roles are missing or empty (coordinating node) assume defaults and consider this a data node
return MachineNodeRole.DATA;
} else if (containsOnly(roles, MASTER_ROLE)) {
return MachineNodeRole.MASTER_ONLY;
} else if (roles.contains(ML_ROLE) && containsOnly(roles, ML_ROLE, REMOTE_CLUSTER_CLIENT_ROLE)) {
return MachineNodeRole.ML_ONLY;
} else {
return MachineNodeRole.DATA;
}
}

/**
* Determine the appropriate heap size for the given role and available system memory.
*
* @param systemMemory total available system memory in bytes
* @return recommended heap size in megabytes
*/
public int heap(long systemMemory) {
return formula.apply(systemMemory);
}
@SuppressWarnings("unchecked")
private static <T> boolean containsOnly(Collection<T> collection, T... items) {
return Arrays.asList(items).containsAll(collection);
}

private static int mb(long bytes) {
return (int) (bytes / (1024 * 1024));
}
private static List<String> options(int heapSize) {
return List.of("-Xms" + heapSize + "m", "-Xmx" + heapSize + "m");
}

private static int mb(long bytes, int toLowerMultipleOfMb) {
return toLowerMultipleOfMb * (int) (bytes / (1024 * 1024 * toLowerMultipleOfMb));
}
protected enum MachineNodeRole {
MASTER_ONLY,
ML_ONLY,
DATA;
}
}
Expand Up @@ -250,7 +250,7 @@ protected Command loadTool(String toolname, String libs) {
// protected to allow tests to override
protected ServerProcess startServer(Terminal terminal, ProcessInfo processInfo, ServerArgs args) throws Exception {
var tempDir = ServerProcessUtils.setupTempDir(processInfo);
var jvmOptions = JvmOptionsParser.determineJvmOptions(args, processInfo, tempDir);
var jvmOptions = JvmOptionsParser.determineJvmOptions(args, processInfo, tempDir, new MachineDependentHeap());
var serverProcessBuilder = new ServerProcessBuilder().withTerminal(terminal)
.withProcessInfo(processInfo)
.withServerArgs(args)
Expand Down

0 comments on commit 1799b7e

Please sign in to comment.