From 507c1b6a1ada3b90afffcca01367ed1d58faadb5 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Sat, 27 Apr 2024 20:14:00 -0400 Subject: [PATCH] ESQL: Add Block#lookup method (#107982) This adds a method to build a new `Block` by looking up the values in an existing `Block`. Like `BlockHash#lookup` this returns a `ReleasableIterator`. This should allow us to load values using the results of `BlockHash#lookup`. --- x-pack/plugin/esql/compute/build.gradle | 27 ++++ .../compute/data/BooleanArrayBlock.java | 7 + .../compute/data/BooleanBigArrayBlock.java | 7 + .../compute/data/BooleanBlock.java | 5 + .../compute/data/BooleanLookup.java | 96 ++++++++++++++ .../compute/data/BooleanVectorBlock.java | 8 ++ .../compute/data/BytesRefArrayBlock.java | 7 + .../compute/data/BytesRefBlock.java | 5 + .../compute/data/BytesRefLookup.java | 99 ++++++++++++++ .../compute/data/BytesRefVectorBlock.java | 8 ++ .../compute/data/DoubleArrayBlock.java | 7 + .../compute/data/DoubleBigArrayBlock.java | 7 + .../compute/data/DoubleBlock.java | 5 + .../compute/data/DoubleLookup.java | 96 ++++++++++++++ .../compute/data/DoubleVectorBlock.java | 8 ++ .../compute/data/IntArrayBlock.java | 7 + .../compute/data/IntBigArrayBlock.java | 7 + .../elasticsearch/compute/data/IntBlock.java | 5 + .../elasticsearch/compute/data/IntLookup.java | 96 ++++++++++++++ .../compute/data/IntVectorBlock.java | 8 ++ .../compute/data/LongArrayBlock.java | 7 + .../compute/data/LongBigArrayBlock.java | 7 + .../elasticsearch/compute/data/LongBlock.java | 5 + .../compute/data/LongLookup.java | 96 ++++++++++++++ .../compute/data/LongVectorBlock.java | 8 ++ .../blockhash/PackedValuesBlockHash.java | 14 +- .../org/elasticsearch/compute/data/Block.java | 34 +++++ .../compute/data/ConstantNullBlock.java | 7 + .../elasticsearch/compute/data/DocBlock.java | 7 + .../compute/data/OrdinalBytesRefBlock.java | 7 + .../compute/data/X-ArrayBlock.java.st | 12 +- .../compute/data/X-BigArrayBlock.java.st | 7 + .../compute/data/X-Block.java.st | 5 + .../compute/data/X-Lookup.java.st | 111 ++++++++++++++++ .../compute/data/X-VectorBlock.java.st | 8 ++ .../compute/data/BasicBlockTests.java | 122 ++++++++++++++++++ .../compute/data/BigArrayVectorTests.java | 41 ++++++ .../compute/data/BlockMultiValuedTests.java | 87 +++++++++++++ 38 files changed, 1090 insertions(+), 10 deletions(-) create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanLookup.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefLookup.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleLookup.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntLookup.java create mode 100644 x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongLookup.java create mode 100644 x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Lookup.java.st diff --git a/x-pack/plugin/esql/compute/build.gradle b/x-pack/plugin/esql/compute/build.gradle index 98fd8b0a1aa5e..b4fb7637bc679 100644 --- a/x-pack/plugin/esql/compute/build.gradle +++ b/x-pack/plugin/esql/compute/build.gradle @@ -350,6 +350,33 @@ tasks.named('stringTemplates').configure { it.inputFile = stateInputFile it.outputFile = "org/elasticsearch/compute/aggregation/DoubleState.java" } + // block builders + File lookupInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/data/X-Lookup.java.st") + template { + it.properties = intProperties + it.inputFile = lookupInputFile + it.outputFile = "org/elasticsearch/compute/data/IntLookup.java" + } + template { + it.properties = longProperties + it.inputFile = lookupInputFile + it.outputFile = "org/elasticsearch/compute/data/LongLookup.java" + } + template { + it.properties = doubleProperties + it.inputFile = lookupInputFile + it.outputFile = "org/elasticsearch/compute/data/DoubleLookup.java" + } + template { + it.properties = bytesRefProperties + it.inputFile = lookupInputFile + it.outputFile = "org/elasticsearch/compute/data/BytesRefLookup.java" + } + template { + it.properties = booleanProperties + it.inputFile = lookupInputFile + it.outputFile = "org/elasticsearch/compute/data/BooleanLookup.java" + } File arrayStateInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/aggregation/X-ArrayState.java.st") template { it.properties = intProperties diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayBlock.java index 2ec68d268ae8a..254d56f849768 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayBlock.java @@ -9,6 +9,8 @@ import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; import java.io.IOException; @@ -112,6 +114,11 @@ public BooleanBlock filter(int... positions) { } } + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + return new BooleanLookup(this, positions, targetBlockSize); + } + @Override public ElementType elementType() { return ElementType.BOOLEAN; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBigArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBigArrayBlock.java index 51418445713b0..aac728236b136 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBigArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBigArrayBlock.java @@ -9,7 +9,9 @@ import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BitArray; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; import java.io.IOException; @@ -113,6 +115,11 @@ public BooleanBlock filter(int... positions) { } } + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + return new BooleanLookup(this, positions, targetBlockSize); + } + @Override public ElementType elementType() { return ElementType.BOOLEAN; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlock.java index f365a2ed78610..8ae2984018640 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanBlock.java @@ -11,6 +11,8 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.index.mapper.BlockLoader; import java.io.IOException; @@ -38,6 +40,9 @@ public sealed interface BooleanBlock extends Block permits BooleanArrayBlock, Bo @Override BooleanBlock filter(int... positions); + @Override + ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize); + @Override BooleanBlock expand(); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanLookup.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanLookup.java new file mode 100644 index 0000000000000..f969e164eef68 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanLookup.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.core.ReleasableIterator; +import org.elasticsearch.core.Releasables; + +/** + * Generic {@link Block#lookup} implementation {@link BooleanBlock}s. + * This class is generated. Do not edit it. + */ +final class BooleanLookup implements ReleasableIterator { + private final BooleanBlock values; + private final IntBlock positions; + private final long targetByteSize; + private int position; + + private boolean first; + private int valuesInPosition; + + BooleanLookup(BooleanBlock values, IntBlock positions, ByteSizeValue targetBlockSize) { + values.incRef(); + positions.incRef(); + this.values = values; + this.positions = positions; + this.targetByteSize = targetBlockSize.getBytes(); + } + + @Override + public boolean hasNext() { + return position < positions.getPositionCount(); + } + + @Override + public BooleanBlock next() { + try (BooleanBlock.Builder builder = positions.blockFactory().newBooleanBlockBuilder(positions.getTotalValueCount())) { + int count = 0; + while (position < positions.getPositionCount()) { + int start = positions.getFirstValueIndex(position); + int end = start + positions.getValueCount(position); + valuesInPosition = 0; + for (int i = start; i < end; i++) { + copy(builder, positions.getInt(i)); + } + switch (valuesInPosition) { + case 0 -> builder.appendNull(); + case 1 -> builder.appendBoolean(first); + default -> builder.endPositionEntry(); + } + position++; + // TOOD what if the estimate is super huge? should we break even with less than MIN_TARGET? + if (++count > Operator.MIN_TARGET_PAGE_SIZE && builder.estimatedBytes() < targetByteSize) { + break; + } + } + return builder.build(); + } + } + + private void copy(BooleanBlock.Builder builder, int valuePosition) { + if (valuePosition >= values.getPositionCount()) { + return; + } + int start = values.getFirstValueIndex(valuePosition); + int end = start + values.getValueCount(valuePosition); + for (int i = start; i < end; i++) { + if (valuesInPosition == 0) { + first = values.getBoolean(i); + valuesInPosition++; + continue; + } + if (valuesInPosition == 1) { + builder.beginPositionEntry(); + builder.appendBoolean(first); + } + if (valuesInPosition > Block.MAX_LOOKUP) { + // TODO replace this with a warning and break + throw new IllegalArgumentException("Found a single entry with " + valuesInPosition + " entries"); + } + builder.appendBoolean(values.getBoolean(i)); + valuesInPosition++; + } + } + + @Override + public void close() { + Releasables.close(values, positions); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanVectorBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanVectorBlock.java index 70fcfeca94869..013718bb42a7d 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanVectorBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanVectorBlock.java @@ -7,6 +7,8 @@ package org.elasticsearch.compute.data; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; /** @@ -49,6 +51,12 @@ public BooleanBlock filter(int... positions) { return vector.filter(positions).asBlock(); } + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + // TODO optimizations + return new BooleanLookup(this, positions, targetBlockSize); + } + @Override public BooleanBlock expand() { incRef(); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayBlock.java index 8eaf07b473a3a..c33bd12b74bbd 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayBlock.java @@ -10,7 +10,9 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BytesRefArray; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; import java.io.IOException; @@ -116,6 +118,11 @@ public BytesRefBlock filter(int... positions) { } } + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + return new BytesRefLookup(this, positions, targetBlockSize); + } + @Override public ElementType elementType() { return ElementType.BYTES_REF; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlock.java index a6c75dbc1122f..d3afcfd6dde4d 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefBlock.java @@ -12,6 +12,8 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.index.mapper.BlockLoader; import java.io.IOException; @@ -42,6 +44,9 @@ public sealed interface BytesRefBlock extends Block permits BytesRefArrayBlock, @Override BytesRefBlock filter(int... positions); + @Override + ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize); + @Override BytesRefBlock expand(); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefLookup.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefLookup.java new file mode 100644 index 0000000000000..3ec62902fe048 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefLookup.java @@ -0,0 +1,99 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.core.ReleasableIterator; +import org.elasticsearch.core.Releasables; + +/** + * Generic {@link Block#lookup} implementation {@link BytesRefBlock}s. + * This class is generated. Do not edit it. + */ +final class BytesRefLookup implements ReleasableIterator { + private final BytesRef firstScratch = new BytesRef(); + private final BytesRef valueScratch = new BytesRef(); + private final BytesRefBlock values; + private final IntBlock positions; + private final long targetByteSize; + private int position; + + private BytesRef first; + private int valuesInPosition; + + BytesRefLookup(BytesRefBlock values, IntBlock positions, ByteSizeValue targetBlockSize) { + values.incRef(); + positions.incRef(); + this.values = values; + this.positions = positions; + this.targetByteSize = targetBlockSize.getBytes(); + } + + @Override + public boolean hasNext() { + return position < positions.getPositionCount(); + } + + @Override + public BytesRefBlock next() { + try (BytesRefBlock.Builder builder = positions.blockFactory().newBytesRefBlockBuilder(positions.getTotalValueCount())) { + int count = 0; + while (position < positions.getPositionCount()) { + int start = positions.getFirstValueIndex(position); + int end = start + positions.getValueCount(position); + valuesInPosition = 0; + for (int i = start; i < end; i++) { + copy(builder, positions.getInt(i)); + } + switch (valuesInPosition) { + case 0 -> builder.appendNull(); + case 1 -> builder.appendBytesRef(first); + default -> builder.endPositionEntry(); + } + position++; + // TOOD what if the estimate is super huge? should we break even with less than MIN_TARGET? + if (++count > Operator.MIN_TARGET_PAGE_SIZE && builder.estimatedBytes() < targetByteSize) { + break; + } + } + return builder.build(); + } + } + + private void copy(BytesRefBlock.Builder builder, int valuePosition) { + if (valuePosition >= values.getPositionCount()) { + return; + } + int start = values.getFirstValueIndex(valuePosition); + int end = start + values.getValueCount(valuePosition); + for (int i = start; i < end; i++) { + if (valuesInPosition == 0) { + first = values.getBytesRef(i, firstScratch); + valuesInPosition++; + continue; + } + if (valuesInPosition == 1) { + builder.beginPositionEntry(); + builder.appendBytesRef(first); + } + if (valuesInPosition > Block.MAX_LOOKUP) { + // TODO replace this with a warning and break + throw new IllegalArgumentException("Found a single entry with " + valuesInPosition + " entries"); + } + builder.appendBytesRef(values.getBytesRef(i, valueScratch)); + valuesInPosition++; + } + } + + @Override + public void close() { + Releasables.close(values, positions); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefVectorBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefVectorBlock.java index 8c8c3b59ff758..9838fde8a0ffe 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefVectorBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefVectorBlock.java @@ -8,6 +8,8 @@ package org.elasticsearch.compute.data; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; /** @@ -50,6 +52,12 @@ public BytesRefBlock filter(int... positions) { return vector.filter(positions).asBlock(); } + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + // TODO optimizations + return new BytesRefLookup(this, positions, targetBlockSize); + } + @Override public BytesRefBlock expand() { incRef(); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayBlock.java index d545fca4fca8d..4d923e4ca77c8 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayBlock.java @@ -9,6 +9,8 @@ import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; import java.io.IOException; @@ -112,6 +114,11 @@ public DoubleBlock filter(int... positions) { } } + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + return new DoubleLookup(this, positions, targetBlockSize); + } + @Override public ElementType elementType() { return ElementType.DOUBLE; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBigArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBigArrayBlock.java index 5698f40b530b7..203856f88c4ce 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBigArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBigArrayBlock.java @@ -9,7 +9,9 @@ import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.DoubleArray; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; import java.io.IOException; @@ -113,6 +115,11 @@ public DoubleBlock filter(int... positions) { } } + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + return new DoubleLookup(this, positions, targetBlockSize); + } + @Override public ElementType elementType() { return ElementType.DOUBLE; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlock.java index a682c2cba019e..95f318703df62 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleBlock.java @@ -11,6 +11,8 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.index.mapper.BlockLoader; import java.io.IOException; @@ -38,6 +40,9 @@ public sealed interface DoubleBlock extends Block permits DoubleArrayBlock, Doub @Override DoubleBlock filter(int... positions); + @Override + ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize); + @Override DoubleBlock expand(); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleLookup.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleLookup.java new file mode 100644 index 0000000000000..bcb8a414f7c57 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleLookup.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.core.ReleasableIterator; +import org.elasticsearch.core.Releasables; + +/** + * Generic {@link Block#lookup} implementation {@link DoubleBlock}s. + * This class is generated. Do not edit it. + */ +final class DoubleLookup implements ReleasableIterator { + private final DoubleBlock values; + private final IntBlock positions; + private final long targetByteSize; + private int position; + + private double first; + private int valuesInPosition; + + DoubleLookup(DoubleBlock values, IntBlock positions, ByteSizeValue targetBlockSize) { + values.incRef(); + positions.incRef(); + this.values = values; + this.positions = positions; + this.targetByteSize = targetBlockSize.getBytes(); + } + + @Override + public boolean hasNext() { + return position < positions.getPositionCount(); + } + + @Override + public DoubleBlock next() { + try (DoubleBlock.Builder builder = positions.blockFactory().newDoubleBlockBuilder(positions.getTotalValueCount())) { + int count = 0; + while (position < positions.getPositionCount()) { + int start = positions.getFirstValueIndex(position); + int end = start + positions.getValueCount(position); + valuesInPosition = 0; + for (int i = start; i < end; i++) { + copy(builder, positions.getInt(i)); + } + switch (valuesInPosition) { + case 0 -> builder.appendNull(); + case 1 -> builder.appendDouble(first); + default -> builder.endPositionEntry(); + } + position++; + // TOOD what if the estimate is super huge? should we break even with less than MIN_TARGET? + if (++count > Operator.MIN_TARGET_PAGE_SIZE && builder.estimatedBytes() < targetByteSize) { + break; + } + } + return builder.build(); + } + } + + private void copy(DoubleBlock.Builder builder, int valuePosition) { + if (valuePosition >= values.getPositionCount()) { + return; + } + int start = values.getFirstValueIndex(valuePosition); + int end = start + values.getValueCount(valuePosition); + for (int i = start; i < end; i++) { + if (valuesInPosition == 0) { + first = values.getDouble(i); + valuesInPosition++; + continue; + } + if (valuesInPosition == 1) { + builder.beginPositionEntry(); + builder.appendDouble(first); + } + if (valuesInPosition > Block.MAX_LOOKUP) { + // TODO replace this with a warning and break + throw new IllegalArgumentException("Found a single entry with " + valuesInPosition + " entries"); + } + builder.appendDouble(values.getDouble(i)); + valuesInPosition++; + } + } + + @Override + public void close() { + Releasables.close(values, positions); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleVectorBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleVectorBlock.java index eec6675e93ae7..e76a4e0c5fdee 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleVectorBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleVectorBlock.java @@ -7,6 +7,8 @@ package org.elasticsearch.compute.data; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; /** @@ -49,6 +51,12 @@ public DoubleBlock filter(int... positions) { return vector.filter(positions).asBlock(); } + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + // TODO optimizations + return new DoubleLookup(this, positions, targetBlockSize); + } + @Override public DoubleBlock expand() { incRef(); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayBlock.java index 41c9d3b84485d..6231e8f9c5a10 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayBlock.java @@ -9,6 +9,8 @@ import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; import java.io.IOException; @@ -112,6 +114,11 @@ public IntBlock filter(int... positions) { } } + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + return new IntLookup(this, positions, targetBlockSize); + } + @Override public ElementType elementType() { return ElementType.INT; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBigArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBigArrayBlock.java index 66c0b15415418..a1e84db8c4f27 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBigArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBigArrayBlock.java @@ -9,7 +9,9 @@ import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.IntArray; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; import java.io.IOException; @@ -113,6 +115,11 @@ public IntBlock filter(int... positions) { } } + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + return new IntLookup(this, positions, targetBlockSize); + } + @Override public ElementType elementType() { return ElementType.INT; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlock.java index e9d606b51c6a1..21d40170151a5 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntBlock.java @@ -11,6 +11,8 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.index.mapper.BlockLoader; import java.io.IOException; @@ -38,6 +40,9 @@ public sealed interface IntBlock extends Block permits IntArrayBlock, IntVectorB @Override IntBlock filter(int... positions); + @Override + ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize); + @Override IntBlock expand(); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntLookup.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntLookup.java new file mode 100644 index 0000000000000..b7ea15cd9d818 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntLookup.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.core.ReleasableIterator; +import org.elasticsearch.core.Releasables; + +/** + * Generic {@link Block#lookup} implementation {@link IntBlock}s. + * This class is generated. Do not edit it. + */ +final class IntLookup implements ReleasableIterator { + private final IntBlock values; + private final IntBlock positions; + private final long targetByteSize; + private int position; + + private int first; + private int valuesInPosition; + + IntLookup(IntBlock values, IntBlock positions, ByteSizeValue targetBlockSize) { + values.incRef(); + positions.incRef(); + this.values = values; + this.positions = positions; + this.targetByteSize = targetBlockSize.getBytes(); + } + + @Override + public boolean hasNext() { + return position < positions.getPositionCount(); + } + + @Override + public IntBlock next() { + try (IntBlock.Builder builder = positions.blockFactory().newIntBlockBuilder(positions.getTotalValueCount())) { + int count = 0; + while (position < positions.getPositionCount()) { + int start = positions.getFirstValueIndex(position); + int end = start + positions.getValueCount(position); + valuesInPosition = 0; + for (int i = start; i < end; i++) { + copy(builder, positions.getInt(i)); + } + switch (valuesInPosition) { + case 0 -> builder.appendNull(); + case 1 -> builder.appendInt(first); + default -> builder.endPositionEntry(); + } + position++; + // TOOD what if the estimate is super huge? should we break even with less than MIN_TARGET? + if (++count > Operator.MIN_TARGET_PAGE_SIZE && builder.estimatedBytes() < targetByteSize) { + break; + } + } + return builder.build(); + } + } + + private void copy(IntBlock.Builder builder, int valuePosition) { + if (valuePosition >= values.getPositionCount()) { + return; + } + int start = values.getFirstValueIndex(valuePosition); + int end = start + values.getValueCount(valuePosition); + for (int i = start; i < end; i++) { + if (valuesInPosition == 0) { + first = values.getInt(i); + valuesInPosition++; + continue; + } + if (valuesInPosition == 1) { + builder.beginPositionEntry(); + builder.appendInt(first); + } + if (valuesInPosition > Block.MAX_LOOKUP) { + // TODO replace this with a warning and break + throw new IllegalArgumentException("Found a single entry with " + valuesInPosition + " entries"); + } + builder.appendInt(values.getInt(i)); + valuesInPosition++; + } + } + + @Override + public void close() { + Releasables.close(values, positions); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntVectorBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntVectorBlock.java index 39f8426a8da3a..70bcf6919bea6 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntVectorBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntVectorBlock.java @@ -7,6 +7,8 @@ package org.elasticsearch.compute.data; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; /** @@ -49,6 +51,12 @@ public IntBlock filter(int... positions) { return vector.filter(positions).asBlock(); } + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + // TODO optimizations + return new IntLookup(this, positions, targetBlockSize); + } + @Override public IntBlock expand() { incRef(); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayBlock.java index 56370f718bae0..d8357e5d367cc 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayBlock.java @@ -9,6 +9,8 @@ import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; import java.io.IOException; @@ -112,6 +114,11 @@ public LongBlock filter(int... positions) { } } + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + return new LongLookup(this, positions, targetBlockSize); + } + @Override public ElementType elementType() { return ElementType.LONG; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBigArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBigArrayBlock.java index e3b17cc7be5d4..0ccd4ab368659 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBigArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBigArrayBlock.java @@ -9,7 +9,9 @@ import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; import java.io.IOException; @@ -113,6 +115,11 @@ public LongBlock filter(int... positions) { } } + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + return new LongLookup(this, positions, targetBlockSize); + } + @Override public ElementType elementType() { return ElementType.LONG; diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlock.java index 3e1c5fcfaac95..5a11ee8e2a6e3 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongBlock.java @@ -11,6 +11,8 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.index.mapper.BlockLoader; import java.io.IOException; @@ -38,6 +40,9 @@ public sealed interface LongBlock extends Block permits LongArrayBlock, LongVect @Override LongBlock filter(int... positions); + @Override + ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize); + @Override LongBlock expand(); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongLookup.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongLookup.java new file mode 100644 index 0000000000000..ca1b06d70b1d1 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongLookup.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.core.ReleasableIterator; +import org.elasticsearch.core.Releasables; + +/** + * Generic {@link Block#lookup} implementation {@link LongBlock}s. + * This class is generated. Do not edit it. + */ +final class LongLookup implements ReleasableIterator { + private final LongBlock values; + private final IntBlock positions; + private final long targetByteSize; + private int position; + + private long first; + private int valuesInPosition; + + LongLookup(LongBlock values, IntBlock positions, ByteSizeValue targetBlockSize) { + values.incRef(); + positions.incRef(); + this.values = values; + this.positions = positions; + this.targetByteSize = targetBlockSize.getBytes(); + } + + @Override + public boolean hasNext() { + return position < positions.getPositionCount(); + } + + @Override + public LongBlock next() { + try (LongBlock.Builder builder = positions.blockFactory().newLongBlockBuilder(positions.getTotalValueCount())) { + int count = 0; + while (position < positions.getPositionCount()) { + int start = positions.getFirstValueIndex(position); + int end = start + positions.getValueCount(position); + valuesInPosition = 0; + for (int i = start; i < end; i++) { + copy(builder, positions.getInt(i)); + } + switch (valuesInPosition) { + case 0 -> builder.appendNull(); + case 1 -> builder.appendLong(first); + default -> builder.endPositionEntry(); + } + position++; + // TOOD what if the estimate is super huge? should we break even with less than MIN_TARGET? + if (++count > Operator.MIN_TARGET_PAGE_SIZE && builder.estimatedBytes() < targetByteSize) { + break; + } + } + return builder.build(); + } + } + + private void copy(LongBlock.Builder builder, int valuePosition) { + if (valuePosition >= values.getPositionCount()) { + return; + } + int start = values.getFirstValueIndex(valuePosition); + int end = start + values.getValueCount(valuePosition); + for (int i = start; i < end; i++) { + if (valuesInPosition == 0) { + first = values.getLong(i); + valuesInPosition++; + continue; + } + if (valuesInPosition == 1) { + builder.beginPositionEntry(); + builder.appendLong(first); + } + if (valuesInPosition > Block.MAX_LOOKUP) { + // TODO replace this with a warning and break + throw new IllegalArgumentException("Found a single entry with " + valuesInPosition + " entries"); + } + builder.appendLong(values.getLong(i)); + valuesInPosition++; + } + } + + @Override + public void close() { + Releasables.close(values, positions); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongVectorBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongVectorBlock.java index b573e025c0be1..b6f1e8e77505d 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongVectorBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongVectorBlock.java @@ -7,6 +7,8 @@ package org.elasticsearch.compute.data; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; /** @@ -49,6 +51,12 @@ public LongBlock filter(int... positions) { return vector.filter(positions).asBlock(); } + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + // TODO optimizations + return new LongLookup(this, positions, targetBlockSize); + } + @Override public LongBlock expand() { incRef(); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java index 769155db5ecfa..809c433a000a7 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java @@ -59,7 +59,6 @@ */ final class PackedValuesBlockHash extends BlockHash { static final int DEFAULT_BATCH_SIZE = Math.toIntExact(ByteSizeValue.ofKb(10).getBytes()); - private static final long MAX_LOOKUP = 100_000; private final int emitBatchSize; private final BytesRefHash bytesRefHash; @@ -183,14 +182,14 @@ public ReleasableIterator lookup(Page page, ByteSizeValue targetBlockS class LookupWork implements ReleasableIterator { private final Group[] groups; - private final long targetBytesSize; + private final long targetByteSize; private final int positionCount; private int position; - LookupWork(Page page, long targetBytesSize, int batchSize) { + LookupWork(Page page, long targetByteSize, int batchSize) { this.groups = specs.stream().map(s -> new Group(s, page, batchSize)).toArray(Group[]::new); this.positionCount = page.getPositionCount(); - this.targetBytesSize = targetBytesSize; + this.targetByteSize = targetByteSize; } @Override @@ -200,9 +199,10 @@ public boolean hasNext() { @Override public IntBlock next() { - int size = Math.toIntExact(Math.min(Integer.MAX_VALUE, targetBytesSize / Integer.BYTES / 2)); + int size = Math.toIntExact(Math.min(Integer.MAX_VALUE, targetByteSize / Integer.BYTES / 2)); try (IntBlock.Builder ords = blockFactory.newIntBlockBuilder(size)) { - while (position < positionCount && ords.estimatedBytes() < targetBytesSize) { + while (position < positionCount && ords.estimatedBytes() < targetByteSize) { + // TODO a test where targetByteSize is very small should still make a few rows. boolean singleEntry = startPosition(groups); if (singleEntry) { lookupSingleEntry(ords); @@ -247,7 +247,7 @@ private void lookupMultipleEntries(IntBlock.Builder ords) { } ords.appendInt(Math.toIntExact(found)); count++; - if (count > MAX_LOOKUP) { + if (count > Block.MAX_LOOKUP) { // TODO replace this with a warning and break throw new IllegalArgumentException("Found a single entry with " + count + " entries"); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java index 709ad4165170d..ed7ee93c99325 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/Block.java @@ -10,8 +10,10 @@ import org.apache.lucene.util.Accountable; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; import org.elasticsearch.index.mapper.BlockLoader; @@ -36,6 +38,11 @@ * the same block at the same time. */ public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, RefCounted, Releasable { + /** + * The maximum number of values that can be added to one position via lookup. + * TODO maybe make this everywhere? + */ + long MAX_LOOKUP = 100_000; /** * {@return an efficient dense single-value view of this block}. @@ -114,6 +121,33 @@ public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, R */ Block filter(int... positions); + /** + * Builds an Iterator of new {@link Block}s with the same {@link #elementType} + * as this Block whose values are copied from positions in this Block. It has the + * same number of {@link #getPositionCount() positions} as the {@code positions} + * parameter. + *

+ * For example, this this block contained {@code [a, b, [b, c]]} + * and were called with the block {@code [0, 1, 1, [1, 2]]} then the + * result would be {@code [a, b, b, [b, b, c]]}. + *

+ *

+ * This process produces {@code count(this) * count(positions)} values per + * positions which could be quite quite large. Instead of returning a single + * Block, this returns an Iterator of Blocks containing all of the promised + * values. + *

+ *

+ * The returned {@link ReleasableIterator} may retain a reference to {@link Block}s + * inside the {@link Page}. Close it to release those references. + *

+ *

+ * This block is built using the same {@link BlockFactory} as was used to + * build the {@code positions} parameter. + *

+ */ + ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize); + /** * How are multivalued fields ordered? * Some operators can enable its optimization when mv_values are sorted ascending or de-duplicated. diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java index bdeb5334e0da7..1baa4d2283b25 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/ConstantNullBlock.java @@ -11,6 +11,8 @@ import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; import java.io.IOException; import java.util.Objects; @@ -75,6 +77,11 @@ public ConstantNullBlock filter(int... positions) { return (ConstantNullBlock) blockFactory().newConstantNullBlock(positions.length); } + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + return ReleasableIterator.single((ConstantNullBlock) positions.blockFactory().newConstantNullBlock(positions.getPositionCount())); + } + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( Block.class, "ConstantNullBlock", diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocBlock.java index f454abe7d2cfe..e5a0d934aa01a 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/DocBlock.java @@ -8,6 +8,8 @@ package org.elasticsearch.compute.data; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; import java.io.IOException; @@ -48,6 +50,11 @@ public Block filter(int... positions) { return new DocBlock(asVector().filter(positions)); } + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + throw new UnsupportedOperationException(); + } + @Override public DocBlock expand() { incRef(); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java index 64e3faca1f517..41ab5256e9109 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/OrdinalBytesRefBlock.java @@ -9,6 +9,8 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; import java.io.IOException; @@ -118,6 +120,11 @@ public BytesRefBlock filter(int... positions) { } } + @Override + public ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + return new BytesRefLookup(this, positions, targetBlockSize); + } + @Override protected void closeInternal() { Releasables.close(ordinals, bytes); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayBlock.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayBlock.java.st index 9b153317c8a0e..1de2fa239e61e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayBlock.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayBlock.java.st @@ -11,15 +11,16 @@ $if(BytesRef)$ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BytesRefArray; -import org.elasticsearch.core.Releasables; - $else$ import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +$endif$ +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; -$endif$ import java.io.IOException; import java.util.BitSet; @@ -132,6 +133,11 @@ $endif$ } } + @Override + public ReleasableIterator<$Type$Block> lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + return new $Type$Lookup(this, positions, targetBlockSize); + } + @Override public ElementType elementType() { return ElementType.$TYPE$; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-BigArrayBlock.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-BigArrayBlock.java.st index 53f0bb09640c5..66bdcc5d39fb0 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-BigArrayBlock.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-BigArrayBlock.java.st @@ -9,7 +9,9 @@ package org.elasticsearch.compute.data; import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.$Array$; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; import java.io.IOException; @@ -113,6 +115,11 @@ public final class $Type$BigArrayBlock extends AbstractArrayBlock implements $Ty } } + @Override + public ReleasableIterator<$Type$Block> lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + return new $Type$Lookup(this, positions, targetBlockSize); + } + @Override public ElementType elementType() { return ElementType.$TYPE$; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Block.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Block.java.st index 331a5713fa3d1..b9d3dfc1f16ff 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Block.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Block.java.st @@ -14,6 +14,8 @@ import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.index.mapper.BlockLoader; import java.io.IOException; @@ -58,6 +60,9 @@ $endif$ @Override $Type$Block filter(int... positions); + @Override + ReleasableIterator lookup(IntBlock positions, ByteSizeValue targetBlockSize); + @Override $Type$Block expand(); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Lookup.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Lookup.java.st new file mode 100644 index 0000000000000..668752fe3f59f --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-Lookup.java.st @@ -0,0 +1,111 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.data; + +$if(BytesRef)$ +import org.apache.lucene.util.BytesRef; +$endif$ +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.core.ReleasableIterator; +import org.elasticsearch.core.Releasables; + +/** + * Generic {@link Block#lookup} implementation {@link $Type$Block}s. + * This class is generated. Do not edit it. + */ +final class $Type$Lookup implements ReleasableIterator<$Type$Block> { +$if(BytesRef)$ + private final BytesRef firstScratch = new BytesRef(); + private final BytesRef valueScratch = new BytesRef(); +$endif$ + private final $Type$Block values; + private final IntBlock positions; + private final long targetByteSize; + private int position; + + private $type$ first; + private int valuesInPosition; + + $Type$Lookup($Type$Block values, IntBlock positions, ByteSizeValue targetBlockSize) { + values.incRef(); + positions.incRef(); + this.values = values; + this.positions = positions; + this.targetByteSize = targetBlockSize.getBytes(); + } + + @Override + public boolean hasNext() { + return position < positions.getPositionCount(); + } + + @Override + public $Type$Block next() { + try ($Type$Block.Builder builder = positions.blockFactory().new$Type$BlockBuilder(positions.getTotalValueCount())) { + int count = 0; + while (position < positions.getPositionCount()) { + int start = positions.getFirstValueIndex(position); + int end = start + positions.getValueCount(position); + valuesInPosition = 0; + for (int i = start; i < end; i++) { + copy(builder, positions.getInt(i)); + } + switch (valuesInPosition) { + case 0 -> builder.appendNull(); + case 1 -> builder.append$Type$(first); + default -> builder.endPositionEntry(); + } + position++; + // TOOD what if the estimate is super huge? should we break even with less than MIN_TARGET? + if (++count > Operator.MIN_TARGET_PAGE_SIZE && builder.estimatedBytes() < targetByteSize) { + break; + } + } + return builder.build(); + } + } + + private void copy($Type$Block.Builder builder, int valuePosition) { + if (valuePosition >= values.getPositionCount()) { + return; + } + int start = values.getFirstValueIndex(valuePosition); + int end = start + values.getValueCount(valuePosition); + for (int i = start; i < end; i++) { + if (valuesInPosition == 0) { +$if(BytesRef)$ + first = values.get$Type$(i, firstScratch); +$else$ + first = values.get$Type$(i); +$endif$ + valuesInPosition++; + continue; + } + if (valuesInPosition == 1) { + builder.beginPositionEntry(); + builder.append$Type$(first); + } + if (valuesInPosition > Block.MAX_LOOKUP) { + // TODO replace this with a warning and break + throw new IllegalArgumentException("Found a single entry with " + valuesInPosition + " entries"); + } +$if(BytesRef)$ + builder.append$Type$(values.get$Type$(i, valueScratch)); +$else$ + builder.append$Type$(values.get$Type$(i)); +$endif$ + valuesInPosition++; + } + } + + @Override + public void close() { + Releasables.close(values, positions); + } +} diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-VectorBlock.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-VectorBlock.java.st index 8df5cea4c883b..274457a4d5bd8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-VectorBlock.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-VectorBlock.java.st @@ -10,6 +10,8 @@ package org.elasticsearch.compute.data; $if(BytesRef)$ import org.apache.lucene.util.BytesRef; $endif$ +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; /** @@ -57,6 +59,12 @@ $endif$ return vector.filter(positions).asBlock(); } + @Override + public ReleasableIterator<$Type$Block> lookup(IntBlock positions, ByteSizeValue targetBlockSize) { + // TODO optimizations + return new $Type$Lookup(this, positions, targetBlockSize); + } + @Override public $Type$Block expand() { incRef(); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BasicBlockTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BasicBlockTests.java index ee505704f762b..6852cd52862b2 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BasicBlockTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BasicBlockTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.ReleasableIterator; import org.elasticsearch.core.Releasables; import org.elasticsearch.geo.GeometryTestUtils; import org.elasticsearch.geo.ShapeTestUtils; @@ -38,6 +39,7 @@ import java.util.stream.IntStream; import java.util.stream.LongStream; +import static java.util.Collections.singletonList; import static org.elasticsearch.xpack.ql.util.SpatialCoordinateTypes.CARTESIAN; import static org.elasticsearch.xpack.ql.util.SpatialCoordinateTypes.GEO; import static org.hamcrest.Matchers.containsString; @@ -47,6 +49,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -192,6 +195,11 @@ public void testIntBlock() { int pos = block.getInt(randomPosition(positionCount)); assertThat(pos, is(block.getInt(pos))); assertSingleValueDenseBlock(block); + if (positionCount > 2) { + assertLookup(block, positions(blockFactory, 1, 2, new int[] { 1, 2 }), List.of(List.of(1), List.of(2), List.of(1, 2))); + } + assertLookup(block, positions(blockFactory, positionCount + 1000), singletonList(null)); + assertEmptyLookup(blockFactory, block); try (IntBlock.Builder blockBuilder = blockFactory.newIntBlockBuilder(1)) { IntBlock copy = blockBuilder.copyFrom(block, 0, block.getPositionCount()).build(); @@ -237,6 +245,15 @@ public void testConstantIntBlock() { assertThat(value, is(block.getInt(randomPosition(positionCount)))); assertThat(block.isNull(randomPosition(positionCount)), is(false)); assertSingleValueDenseBlock(block); + if (positionCount > 2) { + assertLookup( + block, + positions(blockFactory, 1, 2, new int[] { 1, 2 }), + List.of(List.of(value), List.of(value), List.of(value, value)) + ); + } + assertLookup(block, positions(blockFactory, positionCount + 1000), singletonList(null)); + assertEmptyLookup(blockFactory, block); releaseAndAssertBreaker(block); } } @@ -261,6 +278,11 @@ public void testLongBlock() { int pos = (int) block.getLong(randomPosition(positionCount)); assertThat((long) pos, is(block.getLong(pos))); assertSingleValueDenseBlock(block); + if (positionCount > 2) { + assertLookup(block, positions(blockFactory, 1, 2, new int[] { 1, 2 }), List.of(List.of(1L), List.of(2L), List.of(1L, 2L))); + } + assertLookup(block, positions(blockFactory, positionCount + 1000), singletonList(null)); + assertEmptyLookup(blockFactory, block); try (LongBlock.Builder blockBuilder = blockFactory.newLongBlockBuilder(1)) { LongBlock copy = blockBuilder.copyFrom(block, 0, block.getPositionCount()).build(); @@ -303,6 +325,15 @@ public void testConstantLongBlock() { assertThat(value, is(block.getLong(randomPosition(positionCount)))); assertThat(block.isNull(randomPosition(positionCount)), is(false)); assertSingleValueDenseBlock(block); + if (positionCount > 2) { + assertLookup( + block, + positions(blockFactory, 1, 2, new int[] { 1, 2 }), + List.of(List.of(value), List.of(value), List.of(value, value)) + ); + } + assertLookup(block, positions(blockFactory, positionCount + 1000), singletonList(null)); + assertEmptyLookup(blockFactory, block); releaseAndAssertBreaker(block); } } @@ -328,6 +359,11 @@ public void testDoubleBlock() { int pos = (int) block.getDouble(randomPosition(positionCount)); assertThat((double) pos, is(block.getDouble(pos))); assertSingleValueDenseBlock(block); + if (positionCount > 2) { + assertLookup(block, positions(blockFactory, 1, 2, new int[] { 1, 2 }), List.of(List.of(1d), List.of(2d), List.of(1d, 2d))); + } + assertLookup(block, positions(blockFactory, positionCount + 1000), singletonList(null)); + assertEmptyLookup(blockFactory, block); try (DoubleBlock.Builder blockBuilder = blockFactory.newDoubleBlockBuilder(1)) { DoubleBlock copy = blockBuilder.copyFrom(block, 0, block.getPositionCount()).build(); @@ -371,6 +407,15 @@ public void testConstantDoubleBlock() { assertThat(value, is(block.getDouble(positionCount - 1))); assertThat(value, is(block.getDouble(randomPosition(positionCount)))); assertSingleValueDenseBlock(block); + if (positionCount > 2) { + assertLookup( + block, + positions(blockFactory, 1, 2, new int[] { 1, 2 }), + List.of(List.of(value), List.of(value), List.of(value, value)) + ); + } + assertLookup(block, positions(blockFactory, positionCount + 1000), singletonList(null)); + assertEmptyLookup(blockFactory, block); releaseAndAssertBreaker(block); } } @@ -409,6 +454,15 @@ private void testBytesRefBlock(Supplier byteArraySupplier, boolean cho assertions.accept(bytes); } assertSingleValueDenseBlock(block); + if (positionCount > 2) { + assertLookup( + block, + positions(blockFactory, 1, 2, new int[] { 1, 2 }), + List.of(List.of(values[1]), List.of(values[2]), List.of(values[1], values[2])) + ); + } + assertLookup(block, positions(blockFactory, positionCount + 1000), singletonList(null)); + assertEmptyLookup(blockFactory, block); try (BytesRefBlock.Builder blockBuilder = blockFactory.newBytesRefBlockBuilder(1)) { BytesRefBlock copy = blockBuilder.copyFrom(block, 0, block.getPositionCount()).build(); @@ -511,6 +565,15 @@ public void testConstantBytesRefBlock() { bytes = block.getBytesRef(randomPosition(positionCount), bytes); assertThat(bytes, is(value)); assertSingleValueDenseBlock(block); + if (positionCount > 2) { + assertLookup( + block, + positions(blockFactory, 1, 2, new int[] { 1, 2 }), + List.of(List.of(value), List.of(value), List.of(value, value)) + ); + } + assertLookup(block, positions(blockFactory, positionCount + 1000), singletonList(null)); + assertEmptyLookup(blockFactory, block); releaseAndAssertBreaker(block); } } @@ -537,6 +600,15 @@ public void testBooleanBlock() { assertThat(block.getBoolean(0), is(true)); assertThat(block.getBoolean(positionCount - 1), is((positionCount - 1) % 10 == 0)); assertSingleValueDenseBlock(block); + if (positionCount > 1) { + assertLookup( + block, + positions(blockFactory, 1, 0, new int[] { 1, 0 }), + List.of(List.of(false), List.of(true), List.of(false, true)) + ); + } + assertLookup(block, positions(blockFactory, positionCount + 1000), singletonList(null)); + assertEmptyLookup(blockFactory, block); try (BooleanBlock.Builder blockBuilder = blockFactory.newBooleanBlockBuilder(1)) { BooleanBlock copy = blockBuilder.copyFrom(block, 0, block.getPositionCount()).build(); @@ -577,6 +649,15 @@ public void testConstantBooleanBlock() { assertThat(block.getBoolean(positionCount - 1), is(value)); assertThat(block.getBoolean(randomPosition(positionCount)), is(value)); assertSingleValueDenseBlock(block); + if (positionCount > 2) { + assertLookup( + block, + positions(blockFactory, 1, 2, new int[] { 1, 2 }), + List.of(List.of(value), List.of(value), List.of(value, value)) + ); + } + assertLookup(block, positions(blockFactory, positionCount + 1000), singletonList(null)); + assertEmptyLookup(blockFactory, block); releaseAndAssertBreaker(block); } } @@ -1383,4 +1464,45 @@ private Block randomBigArrayBlock() { } }; } + + static IntBlock positions(BlockFactory blockFactory, Object... positions) { + try (IntBlock.Builder builder = blockFactory.newIntBlockBuilder(positions.length)) { + for (Object p : positions) { + if (p instanceof int[] mv) { + builder.beginPositionEntry(); + for (int v : mv) { + builder.appendInt(v); + } + builder.endPositionEntry(); + continue; + } + if (p instanceof Integer v) { + builder.appendInt(v); + continue; + } + throw new IllegalArgumentException("invalid position: " + p + "(" + p.getClass().getName() + ")"); + } + return builder.build(); + } + } + + static void assertEmptyLookup(BlockFactory blockFactory, Block block) { + try ( + IntBlock positions = positions(blockFactory); + ReleasableIterator lookup = block.lookup(positions, ByteSizeValue.ofKb(100)) + ) { + assertThat(lookup.hasNext(), equalTo(false)); + } + } + + static void assertLookup(Block block, IntBlock positions, List> expected) { + try (positions; ReleasableIterator lookup = block.lookup(positions, ByteSizeValue.ofKb(100))) { + assertThat(lookup.hasNext(), equalTo(true)); + try (Block b = lookup.next()) { + assertThat(valuesAtPositions(b, 0, b.getPositionCount()), equalTo(expected)); + assertThat(b.blockFactory(), sameInstance(positions.blockFactory())); + } + assertThat(lookup.hasNext(), equalTo(false)); + } + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BigArrayVectorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BigArrayVectorTests.java index 74d7e3e142d04..067cff2feba08 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BigArrayVectorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BigArrayVectorTests.java @@ -17,8 +17,13 @@ import org.elasticsearch.test.EqualsHashCodeTestUtils; import java.io.IOException; +import java.util.List; import java.util.stream.IntStream; +import static java.util.Collections.singletonList; +import static org.elasticsearch.compute.data.BasicBlockTests.assertEmptyLookup; +import static org.elasticsearch.compute.data.BasicBlockTests.assertLookup; +import static org.elasticsearch.compute.data.BasicBlockTests.positions; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -53,6 +58,15 @@ public void testBoolean() throws IOException { } }); BasicBlockTests.assertSingleValueDenseBlock(vector.asBlock()); + if (positionCount > 1) { + assertLookup( + vector.asBlock(), + positions(blockFactory, 1, 2, new int[] { 1, 2 }), + List.of(List.of(values[1]), List.of(values[2]), List.of(values[1], values[2])) + ); + } + assertLookup(vector.asBlock(), positions(blockFactory, positionCount + 1000), singletonList(null)); + assertEmptyLookup(blockFactory, vector.asBlock()); assertSerialization(block); assertThat(vector.toString(), containsString("BooleanBigArrayVector[positions=" + positionCount)); } @@ -84,6 +98,15 @@ public void testInt() throws IOException { } }); BasicBlockTests.assertSingleValueDenseBlock(vector.asBlock()); + if (positionCount > 1) { + assertLookup( + vector.asBlock(), + positions(blockFactory, 1, 2, new int[] { 1, 2 }), + List.of(List.of(values[1]), List.of(values[2]), List.of(values[1], values[2])) + ); + } + assertLookup(vector.asBlock(), positions(blockFactory, positionCount + 1000), singletonList(null)); + assertEmptyLookup(blockFactory, vector.asBlock()); assertSerialization(block); assertThat(vector.toString(), containsString("IntBigArrayVector[positions=" + positionCount)); } @@ -115,6 +138,15 @@ public void testLong() throws IOException { } }); BasicBlockTests.assertSingleValueDenseBlock(vector.asBlock()); + if (positionCount > 1) { + assertLookup( + vector.asBlock(), + positions(blockFactory, 1, 2, new int[] { 1, 2 }), + List.of(List.of(values[1]), List.of(values[2]), List.of(values[1], values[2])) + ); + } + assertLookup(vector.asBlock(), positions(blockFactory, positionCount + 1000), singletonList(null)); + assertEmptyLookup(blockFactory, vector.asBlock()); assertSerialization(block); assertThat(vector.toString(), containsString("LongBigArrayVector[positions=" + positionCount)); } @@ -146,6 +178,15 @@ public void testDouble() throws IOException { } }); BasicBlockTests.assertSingleValueDenseBlock(vector.asBlock()); + if (positionCount > 1) { + assertLookup( + vector.asBlock(), + positions(blockFactory, 1, 2, new int[] { 1, 2 }), + List.of(List.of(values[1]), List.of(values[2]), List.of(values[1], values[2])) + ); + } + assertLookup(vector.asBlock(), positions(blockFactory, positionCount + 1000), singletonList(null)); + assertEmptyLookup(blockFactory, vector.asBlock()); assertSerialization(block); assertThat(vector.toString(), containsString("DoubleBigArrayVector[positions=" + positionCount)); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java index 1b0e61cea8135..4579eb688d95e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java @@ -17,15 +17,20 @@ import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.core.ReleasableIterator; +import org.elasticsearch.core.Releasables; import org.elasticsearch.test.ESTestCase; import org.junit.After; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.function.IntUnaryOperator; import java.util.stream.IntStream; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class BlockMultiValuedTests extends ESTestCase { @ParametersFactory @@ -104,6 +109,18 @@ public void testFilteredJumbledSubsetThenExpanded() { assertFilteredThenExpanded(false, true); } + public void testLookupFromSingleOnePage() { + assertLookup(ByteSizeValue.ofMb(100), between(1, 32), p -> 1); + } + + public void testLookupFromManyOnePage() { + assertLookup(ByteSizeValue.ofMb(100), between(1, 32), p -> between(1, 5)); + } + + public void testLookupFromSingleManyPages() { + assertLookup(ByteSizeValue.ofBytes(1), between(1, 32), p -> 1); + } + private void assertFiltered(boolean all, boolean shuffled) { int positionCount = randomIntBetween(1, 16 * 1024); var b = BasicBlockTests.randomBlock(blockFactory(), elementType, positionCount, nullAllowed, 0, 10, 0, 0); @@ -212,4 +229,74 @@ public void allBreakersEmpty() throws Exception { assertThat("Unexpected used in breaker: " + breaker, breaker.getUsed(), equalTo(0L)); } } + + private void assertLookup(ByteSizeValue targetBytes, int positionsToCopy, IntUnaryOperator positionsPerPosition) { + BlockFactory positionsFactory = blockFactory(); + int positionCount = randomIntBetween(100, 16 * 1024); + var b = BasicBlockTests.randomBlock(blockFactory(), elementType, positionCount, nullAllowed, 0, 100, 0, 0); + try (IntBlock.Builder builder = positionsFactory.newIntBlockBuilder(positionsToCopy);) { + for (int p = 0; p < positionsToCopy; p++) { + int max = positionsPerPosition.applyAsInt(p); + switch (max) { + case 0 -> builder.appendNull(); + case 1 -> builder.appendInt(between(0, positionCount + 100)); + default -> { + builder.beginPositionEntry(); + for (int v = 0; v < max; v++) { + builder.appendInt(between(0, positionCount + 100)); + } + builder.endPositionEntry(); + } + } + } + Block copy = null; + int positionOffset = 0; + try ( + IntBlock positions = builder.build(); + ReleasableIterator lookup = b.block().lookup(positions, targetBytes); + ) { + for (int p = 0; p < positions.getPositionCount(); p++) { + if (copy == null || p - positionOffset == copy.getPositionCount()) { + if (copy != null) { + positionOffset += copy.getPositionCount(); + copy.close(); + } + assertThat(lookup.hasNext(), equalTo(true)); + copy = lookup.next(); + if (positions.getPositionCount() - positionOffset < Operator.MIN_TARGET_PAGE_SIZE) { + assertThat(copy.getPositionCount(), equalTo(positions.getPositionCount() - positionOffset)); + } else { + assertThat(copy.getPositionCount(), greaterThanOrEqualTo(Operator.MIN_TARGET_PAGE_SIZE)); + } + } + List expected = new ArrayList<>(); + int start = positions.getFirstValueIndex(p); + int end = start + positions.getValueCount(p); + for (int i = start; i < end; i++) { + int toCopy = positions.getInt(i); + if (toCopy < b.block().getPositionCount()) { + List v = BasicBlockTests.valuesAtPositions(b.block(), toCopy, toCopy + 1).get(0); + if (v != null) { + expected.addAll(v); + } + } + } + if (expected.isEmpty()) { + assertThat(copy.isNull(p - positionOffset), equalTo(true)); + } else { + assertThat(copy.isNull(p - positionOffset), equalTo(false)); + assertThat( + BasicBlockTests.valuesAtPositions(copy, p - positionOffset, p + 1 - positionOffset).get(0), + equalTo(expected) + ); + } + } + assertThat(lookup.hasNext(), equalTo(false)); + } finally { + Releasables.close(copy); + } + } finally { + b.block().close(); + } + } }