Skip to content

Commit

Permalink
Add data-stream auto-sharding APM metrics (#107593)
Browse files Browse the repository at this point in the history
Add APM metrics to monitor data stream auto-sharding events. The new metrics are:
- es.auto_sharding.increase_shards.total
- es.auto_sharding.decrease_shards.total
- es.auto_sharding.cooldown_prevented_increase.total
- es.auto_sharding.cooldown_prevented_decrease.total

The first two track situations where the shards increase or decrease during a rollover. The latter two events track when the auto-sharding logic recommends an increase or decrease but the shard change did not take place because we are in a cooldown period due to a recent increase or decrease auto-sharding event.
  • Loading branch information
parkertimmins committed Apr 26, 2024
1 parent 0c41cb7 commit 3ed42f3
Show file tree
Hide file tree
Showing 10 changed files with 291 additions and 22 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/107593.yaml
@@ -0,0 +1,5 @@
pr: 107593
summary: Add auto-sharding APM metrics
area: Infra/Metrics
type: enhancement
issues: []
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.rollover.Condition;
import org.elasticsearch.action.admin.indices.rollover.MaxDocsCondition;
import org.elasticsearch.action.admin.indices.rollover.MetadataRolloverService;
import org.elasticsearch.action.admin.indices.rollover.OptimalShardCountCondition;
import org.elasticsearch.action.admin.indices.rollover.RolloverConditions;
import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
Expand All @@ -25,6 +26,7 @@
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.datastreams.autosharding.AutoShardingType;
import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -49,7 +51,11 @@
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.telemetry.InstrumentType;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xcontent.XContentType;
Expand All @@ -60,14 +66,17 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import static org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService.DATA_STREAMS_AUTO_SHARDING_ENABLED;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.DEFAULT_TIMESTAMP_FIELD;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
Expand All @@ -77,7 +86,12 @@ public class DataStreamAutoshardingIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(DataStreamsPlugin.class, MockTransportService.TestPlugin.class, TestAutoshardingPlugin.class);
return List.of(
DataStreamsPlugin.class,
MockTransportService.TestPlugin.class,
TestAutoshardingPlugin.class,
TestTelemetryPlugin.class
);
}

@Before
Expand Down Expand Up @@ -109,6 +123,7 @@ public void testRolloverOnAutoShardCondition() throws Exception {
indexDocs(dataStreamName, randomIntBetween(100, 200));

{
resetTelemetry();
ClusterState clusterStateBeforeRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
DataStream dataStreamBeforeRollover = clusterStateBeforeRollover.getMetadata().dataStreams().get(dataStreamName);
String assignedShardNodeId = clusterStateBeforeRollover.routingTable()
Expand Down Expand Up @@ -152,11 +167,14 @@ public void testRolloverOnAutoShardCondition() throws Exception {
assertThat(metConditions.get(0).value(), instanceOf(Integer.class));
int autoShardingRolloverInfo = (int) metConditions.get(0).value();
assertThat(autoShardingRolloverInfo, is(5));

assertTelemetry(MetadataRolloverService.AUTO_SHARDING_METRIC_NAMES.get(AutoShardingType.INCREASE_SHARDS));
}

// let's do another rollover now that will not increase the number of shards because the increase shards cooldown has not lapsed,
// however the rollover will use the existing/previous auto shard configuration and the new generation index will have 5 shards
{
resetTelemetry();
ClusterState clusterStateBeforeRollover = internalCluster().getCurrentMasterNodeInstance(ClusterService.class).state();
DataStream dataStreamBeforeRollover = clusterStateBeforeRollover.getMetadata().dataStreams().get(dataStreamName);
String assignedShardNodeId = clusterStateBeforeRollover.routingTable()
Expand Down Expand Up @@ -193,6 +211,8 @@ public void testRolloverOnAutoShardCondition() throws Exception {

// we remained on 5 shards due to the increase shards cooldown
assertThat(thirdGenerationMeta.getNumberOfShards(), is(5));

assertTelemetry(MetadataRolloverService.AUTO_SHARDING_METRIC_NAMES.get(AutoShardingType.COOLDOWN_PREVENTED_INCREASE));
}

{
Expand Down Expand Up @@ -566,4 +586,44 @@ private static void mockStatsForIndex(
}
}
}

private static void resetTelemetry() {
for (PluginsService pluginsService : internalCluster().getInstances(PluginsService.class)) {
final TestTelemetryPlugin telemetryPlugin = pluginsService.filterPlugins(TestTelemetryPlugin.class).findFirst().orElseThrow();
telemetryPlugin.resetMeter();
}
}

private static void assertTelemetry(String expectedEmittedMetric) {
Map<String, List<Measurement>> measurements = new HashMap<>();
for (PluginsService pluginsService : internalCluster().getInstances(PluginsService.class)) {
final TestTelemetryPlugin telemetryPlugin = pluginsService.filterPlugins(TestTelemetryPlugin.class).findFirst().orElseThrow();

telemetryPlugin.collect();

List<String> autoShardingMetrics = telemetryPlugin.getRegisteredMetrics(InstrumentType.LONG_COUNTER)
.stream()
.filter(metric -> metric.startsWith("es.auto_sharding."))
.sorted()
.toList();

assertEquals(autoShardingMetrics, MetadataRolloverService.AUTO_SHARDING_METRIC_NAMES.values().stream().sorted().toList());

for (String metricName : MetadataRolloverService.AUTO_SHARDING_METRIC_NAMES.values()) {
measurements.computeIfAbsent(metricName, n -> new ArrayList<>())
.addAll(telemetryPlugin.getLongCounterMeasurement(metricName));
}
}

// assert other metrics not emitted
MetadataRolloverService.AUTO_SHARDING_METRIC_NAMES.values()
.stream()
.filter(metric -> metric.equals(expectedEmittedMetric) == false)
.forEach(metric -> assertThat(measurements.get(metric), empty()));

assertThat(measurements.get(expectedEmittedMetric), hasSize(1));
Measurement measurement = measurements.get(expectedEmittedMetric).get(0);
assertThat(measurement.getLong(), is(1L));
assertFalse(measurement.isDouble());
}
}
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.ShardLimitValidator;
import org.elasticsearch.script.ScriptCompiler;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -272,13 +273,15 @@ public void setup() throws Exception {
indicesService,
xContentRegistry()
);
TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin();
rolloverService = new MetadataRolloverService(
testThreadPool,
createIndexService,
indexAliasesService,
EmptySystemIndices.INSTANCE,
WriteLoadForecaster.DEFAULT,
clusterService
clusterService,
telemetryPlugin.getTelemetryProvider(Settings.EMPTY)
);
}

Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.MapperTestUtils;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -88,14 +89,16 @@ public void testRolloverClusterStateForDataStream() throws Exception {
);
builder.put(dataStream);
final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build();
final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin();

ThreadPool testThreadPool = new TestThreadPool(getTestName());
try {
MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService(
dataStream,
testThreadPool,
Set.of(createSettingsProvider(xContentRegistry())),
xContentRegistry()
xContentRegistry(),
telemetryPlugin.getTelemetryProvider(Settings.EMPTY)
);
MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
List<Condition<?>> metConditions = Collections.singletonList(condition);
Expand Down Expand Up @@ -184,14 +187,16 @@ public void testRolloverAndMigrateDataStream() throws Exception {
);
builder.put(dataStream);
final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build();
final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin();

ThreadPool testThreadPool = new TestThreadPool(getTestName());
try {
MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService(
dataStream,
testThreadPool,
Set.of(createSettingsProvider(xContentRegistry())),
xContentRegistry()
xContentRegistry(),
telemetryPlugin.getTelemetryProvider(Settings.EMPTY)
);
MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
List<Condition<?>> metConditions = Collections.singletonList(condition);
Expand Down Expand Up @@ -271,14 +276,15 @@ public void testChangingIndexModeFromTimeSeriesToSomethingElseNoEffectOnExisting
);
builder.put(dataStream);
final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build();

final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin();
ThreadPool testThreadPool = new TestThreadPool(getTestName());
try {
MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService(
dataStream,
testThreadPool,
Set.of(createSettingsProvider(xContentRegistry())),
xContentRegistry()
xContentRegistry(),
telemetryPlugin.getTelemetryProvider(Settings.EMPTY)
);
MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
List<Condition<?>> metConditions = Collections.singletonList(condition);
Expand Down Expand Up @@ -336,14 +342,16 @@ public void testRolloverClusterStateWithBrokenOlderTsdbDataStream() throws Excep
int numberOfBackingIndices = randomIntBetween(1, 3);
ClusterState clusterState = createClusterState(dataStreamName, numberOfBackingIndices, now, true);
DataStream dataStream = clusterState.metadata().dataStreams().get(dataStreamName);

ThreadPool testThreadPool = new TestThreadPool(getTestName());
final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin();

try {
MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService(
dataStream,
testThreadPool,
Set.of(createSettingsProvider(xContentRegistry())),
xContentRegistry()
xContentRegistry(),
telemetryPlugin.getTelemetryProvider(Settings.EMPTY)
);
MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
List<Condition<?>> metConditions = Collections.singletonList(condition);
Expand Down Expand Up @@ -417,14 +425,15 @@ public void testRolloverClusterStateWithBrokenTsdbDataStream() throws Exception
int numberOfBackingIndices = randomIntBetween(1, 3);
ClusterState clusterState = createClusterState(dataStreamName, numberOfBackingIndices, now, false);
DataStream dataStream = clusterState.metadata().dataStreams().get(dataStreamName);

final TestTelemetryPlugin telemetryPlugin = new TestTelemetryPlugin();
ThreadPool testThreadPool = new TestThreadPool(getTestName());
try {
MetadataRolloverService rolloverService = DataStreamTestHelper.getMetadataRolloverService(
dataStream,
testThreadPool,
Set.of(createSettingsProvider(xContentRegistry())),
xContentRegistry()
xContentRegistry(),
telemetryPlugin.getTelemetryProvider(Settings.EMPTY)
);
MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
List<Condition<?>> metConditions = Collections.singletonList(condition);
Expand Down
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.datastreams.autosharding.AutoShardingResult;
import org.elasticsearch.action.datastreams.autosharding.AutoShardingType;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasAction;
Expand Down Expand Up @@ -46,6 +47,8 @@
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.threadpool.ThreadPool;

import java.time.Instant;
Expand All @@ -70,15 +73,25 @@ public class MetadataRolloverService {
private static final Logger logger = LogManager.getLogger(MetadataRolloverService.class);
private static final Pattern INDEX_NAME_PATTERN = Pattern.compile("^.*-\\d+$");
private static final List<IndexAbstraction.Type> VALID_ROLLOVER_TARGETS = List.of(ALIAS, DATA_STREAM);

public static final Settings HIDDEN_INDEX_SETTINGS = Settings.builder().put(IndexMetadata.SETTING_INDEX_HIDDEN, true).build();
public static final Map<AutoShardingType, String> AUTO_SHARDING_METRIC_NAMES = Map.of(
AutoShardingType.INCREASE_SHARDS,
"es.auto_sharding.increase_shards.total",
AutoShardingType.DECREASE_SHARDS,
"es.auto_sharding.decrease_shards.total",
AutoShardingType.COOLDOWN_PREVENTED_INCREASE,
"es.auto_sharding.cooldown_prevented_increase.total",
AutoShardingType.COOLDOWN_PREVENTED_DECREASE,
"es.auto_sharding.cooldown_prevented_decrease.total"
);

private final ThreadPool threadPool;
private final MetadataCreateIndexService createIndexService;
private final MetadataIndexAliasesService indexAliasesService;
private final SystemIndices systemIndices;
private final WriteLoadForecaster writeLoadForecaster;
private final ClusterService clusterService;
private final MeterRegistry meterRegistry;

@Inject
public MetadataRolloverService(
Expand All @@ -87,14 +100,23 @@ public MetadataRolloverService(
MetadataIndexAliasesService indexAliasesService,
SystemIndices systemIndices,
WriteLoadForecaster writeLoadForecaster,
ClusterService clusterService
ClusterService clusterService,
TelemetryProvider telemetryProvider
) {
this.threadPool = threadPool;
this.createIndexService = createIndexService;
this.indexAliasesService = indexAliasesService;
this.systemIndices = systemIndices;
this.writeLoadForecaster = writeLoadForecaster;
this.clusterService = clusterService;
this.meterRegistry = telemetryProvider.getMeterRegistry();

for (var entry : AUTO_SHARDING_METRIC_NAMES.entrySet()) {
final AutoShardingType type = entry.getKey();
final String metricName = entry.getValue();
final String description = String.format(Locale.ROOT, "auto-sharding %s counter", type.name().toLowerCase(Locale.ROOT));
meterRegistry.registerLongCounter(metricName, description, "unit");
}
}

public record RolloverResult(String rolloverIndexName, String sourceIndexName, ClusterState clusterState) {
Expand Down Expand Up @@ -330,6 +352,13 @@ private RolloverResult rolloverDataStream(
(builder, indexMetadata) -> builder.put(dataStream.rolloverFailureStore(indexMetadata.getIndex(), newGeneration))
);
} else {
if (autoShardingResult != null) {
final String metricName = AUTO_SHARDING_METRIC_NAMES.get(autoShardingResult.type());
if (metricName != null) {
meterRegistry.getLongCounter(metricName).increment();
}
}

DataStreamAutoShardingEvent dataStreamAutoShardingEvent = autoShardingResult == null
? dataStream.getAutoShardingEvent()
: switch (autoShardingResult.type()) {
Expand Down
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.cluster;

import org.elasticsearch.action.admin.indices.rollover.MetadataRolloverService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.ComponentTemplateMetadata;
Expand Down Expand Up @@ -120,6 +121,7 @@ public class ClusterModule extends AbstractModule {
final ShardsAllocator shardsAllocator;
private final ShardRoutingRoleStrategy shardRoutingRoleStrategy;
private final AllocationStatsService allocationStatsService;
private final TelemetryProvider telemetryProvider;

public ClusterModule(
Settings settings,
Expand Down Expand Up @@ -157,6 +159,7 @@ public ClusterModule(
);
this.metadataDeleteIndexService = new MetadataDeleteIndexService(settings, clusterService, allocationService);
this.allocationStatsService = new AllocationStatsService(clusterService, clusterInfoService, shardsAllocator, writeLoadForecaster);
this.telemetryProvider = telemetryProvider;
}

static ShardRoutingRoleStrategy getShardRoutingRoleStrategy(List<ClusterPlugin> clusterPlugins) {
Expand Down Expand Up @@ -444,6 +447,8 @@ protected void configure() {
bind(ShardsAllocator.class).toInstance(shardsAllocator);
bind(ShardRoutingRoleStrategy.class).toInstance(shardRoutingRoleStrategy);
bind(AllocationStatsService.class).toInstance(allocationStatsService);
bind(TelemetryProvider.class).toInstance(telemetryProvider);
bind(MetadataRolloverService.class).asEagerSingleton();
}

public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator) {
Expand Down

0 comments on commit 3ed42f3

Please sign in to comment.