diff --git a/docs/changelog/107967.yaml b/docs/changelog/107967.yaml new file mode 100644 index 0000000000000..159370e44f236 --- /dev/null +++ b/docs/changelog/107967.yaml @@ -0,0 +1,6 @@ +pr: 107967 +summary: Sort time series indices by time range in `GetDataStreams` API +area: TSDB +type: bug +issues: + - 102088 diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/GetDataStreamsTransportAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/GetDataStreamsTransportAction.java index d4d62c2829172..f7064eb39a015 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/GetDataStreamsTransportAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/GetDataStreamsTransportAction.java @@ -151,46 +151,61 @@ static GetDataStreamAction.Response innerOperation( GetDataStreamAction.Response.TimeSeries timeSeries = null; if (dataStream.getIndexMode() == IndexMode.TIME_SERIES) { - List> ranges = new ArrayList<>(); - Tuple current = null; - String previousIndexName = null; - for (Index index : dataStream.getIndices()) { - IndexMetadata indexMetadata = metadata.index(index); - if (indexMetadata.getIndexMode() != IndexMode.TIME_SERIES) { - continue; + record IndexInfo(String name, Instant timeSeriesStart, Instant timeSeriesEnd) implements Comparable { + @Override + public int compareTo(IndexInfo o) { + return Comparator.comparing(IndexInfo::timeSeriesStart).thenComparing(IndexInfo::timeSeriesEnd).compare(this, o); } - Instant start = indexMetadata.getTimeSeriesStart(); - Instant end = indexMetadata.getTimeSeriesEnd(); - if (current == null) { - current = new Tuple<>(start, end); - } else if (current.v2().compareTo(start) == 0) { - current = new Tuple<>(current.v1(), end); - } else if (current.v2().compareTo(start) < 0) { - ranges.add(current); - current = new Tuple<>(start, end); + } + + List> mergedRanges = new ArrayList<>(); + Tuple currentMergedRange = null; + IndexInfo previous = null; + + // We need indices to be sorted by time series range + // to produce temporal ranges. + // But it is not enforced in API, so we explicitly sort here. + var sortedRanges = dataStream.getIndices() + .stream() + .map(metadata::index) + .filter(m -> m.getIndexMode() == IndexMode.TIME_SERIES) + .map(m -> new IndexInfo(m.getIndex().getName(), m.getTimeSeriesStart(), m.getTimeSeriesEnd())) + .sorted() + .toList(); + + for (var info : sortedRanges) { + Instant start = info.timeSeriesStart(); + Instant end = info.timeSeriesEnd(); + + if (currentMergedRange == null) { + currentMergedRange = new Tuple<>(start, end); + } else if (currentMergedRange.v2().compareTo(start) == 0) { + currentMergedRange = new Tuple<>(currentMergedRange.v1(), end); + } else if (currentMergedRange.v2().compareTo(start) < 0) { + mergedRanges.add(currentMergedRange); + currentMergedRange = new Tuple<>(start, end); } else { String message = "previous backing index [" - + previousIndexName + + previous.name() + "] range [" - + current.v1() + + previous.timeSeriesStart() + "/" - + current.v2() + + previous.timeSeriesEnd() + "] range is colliding with current backing [" - + index.getName() + + info.name() + "] index range [" + start + "/" + end + "]"; - assert current.v2().compareTo(start) < 0 : message; - LOGGER.warn(message); + assert currentMergedRange.v2().compareTo(start) < 0 : message; } - previousIndexName = index.getName(); + previous = info; } - if (current != null) { - ranges.add(current); + if (currentMergedRange != null) { + mergedRanges.add(currentMergedRange); } - timeSeries = new GetDataStreamAction.Response.TimeSeries(ranges); + timeSeries = new GetDataStreamAction.Response.TimeSeries(mergedRanges); } dataStreamInfos.add( diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsTransportActionTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsTransportActionTests.java index f7616482edd10..58ab69d383464 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsTransportActionTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsTransportActionTests.java @@ -209,6 +209,49 @@ public void testGetTimeSeriesDataStream() { ); } + public void testGetTimeSeriesDataStreamWithOutOfOrderIndices() { + Instant now = Instant.now().truncatedTo(ChronoUnit.SECONDS); + String dataStream = "ds-1"; + Instant sixHoursAgo = now.minus(6, ChronoUnit.HOURS); + Instant fourHoursAgo = now.minus(4, ChronoUnit.HOURS); + Instant twoHoursAgo = now.minus(2, ChronoUnit.HOURS); + Instant twoHoursAhead = now.plus(2, ChronoUnit.HOURS); + + ClusterState state; + { + var mBuilder = new Metadata.Builder(); + DataStreamTestHelper.getClusterStateWithDataStream( + mBuilder, + dataStream, + List.of( + new Tuple<>(fourHoursAgo, twoHoursAgo), + new Tuple<>(sixHoursAgo, fourHoursAgo), + new Tuple<>(twoHoursAgo, twoHoursAhead) + ) + ); + state = ClusterState.builder(new ClusterName("_name")).metadata(mBuilder).build(); + } + + var req = new GetDataStreamAction.Request(new String[] {}); + var response = GetDataStreamsTransportAction.innerOperation( + state, + req, + resolver, + systemIndices, + ClusterSettings.createBuiltInClusterSettings(), + dataStreamGlobalRetentionResolver + ); + assertThat( + response.getDataStreams(), + contains( + allOf( + transformedMatch(d -> d.getDataStream().getName(), equalTo(dataStream)), + transformedMatch(d -> d.getTimeSeries().temporalRanges(), contains(new Tuple<>(sixHoursAgo, twoHoursAhead))) + ) + ) + ); + } + public void testGetTimeSeriesMixedDataStream() { Instant instant = Instant.parse("2023-06-06T14:00:00.000Z").truncatedTo(ChronoUnit.SECONDS); String dataStream1 = "ds-1";