Skip to content

Commit

Permalink
ESQL: Begin optimizing Block#lookup (#108482)
Browse files Browse the repository at this point in the history
This creates the infrastructure to allow optimizing the `lookup` method
when applied to `Vector`s and then implements that optimization for
constant vectors. Constant vectors now take one of six paths:
1. An empty positions `Block` yields an empty result set.
2. If `positions` is a `Block`, perform the un-optimized lookup.
3. If the `min` of the `positions` *Vector* is less that 0 then throw an
   exception.
4. If the `min` of the positions Vector is greater than the number of
   positions in the lookup block then return a single
   `ConstantNullBlock` because you are looking up outside the range.
5. If the `max` of the positions Vector is less than the number of
   positions in the lookup block then return a `Constant$Type$Block`
   with the same value as the lookup block. This is a lookup that's
   entirely within range.
6. Otherwise return the unoptimized lookup.

This is *fairly* simple but demonstrates how we can plug in more complex
optimizations later.
  • Loading branch information
nik9000 committed May 10, 2024
1 parent 2d14095 commit 04d3b99
Show file tree
Hide file tree
Showing 37 changed files with 431 additions and 28 deletions.
Expand Up @@ -46,4 +46,30 @@ public String toString() {

};
}

/**
* Returns an empty iterator over the supplied value.
*/
static <T extends Releasable> ReleasableIterator<T> empty() {
return new ReleasableIterator<>() {
@Override
public boolean hasNext() {
return false;
}

@Override
public T next() {
assert false : "hasNext is always false so next should never be called";
return null;
}

@Override
public void close() {}

@Override
public String toString() {
return "ReleasableIterator[<empty>]";
}
};
}
}
Expand Up @@ -10,6 +10,8 @@
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;

import java.io.IOException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -91,6 +93,11 @@ public BooleanVector filter(int... positions) {
}
}

@Override
public ReleasableIterator<BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new BooleanLookup(asBlock(), positions, targetBlockSize);
}

public static long ramBytesEstimated(boolean[] values) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values);
}
Expand Down
Expand Up @@ -10,8 +10,10 @@
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.ReleasableIterator;

import java.io.IOException;

Expand Down Expand Up @@ -87,6 +89,11 @@ public BooleanVector filter(int... positions) {
return new BooleanBigArrayVector(filtered, positions.length, blockFactory);
}

@Override
public ReleasableIterator<BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new BooleanLookup(asBlock(), positions, targetBlockSize);
}

@Override
public void closeInternal() {
// The circuit breaker that tracks the values {@link BitArray} is adjusted outside
Expand Down
Expand Up @@ -10,6 +10,8 @@
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;

import java.io.IOException;

Expand All @@ -27,6 +29,9 @@ public sealed interface BooleanVector extends Vector permits ConstantBooleanVect
@Override
BooleanVector filter(int... positions);

@Override
ReleasableIterator<? extends BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize);

/**
* Compares the given object with this vector for equality. Returns {@code true} if and only if the
* given object is a BooleanVector, and both vectors are {@link #equals(BooleanVector, BooleanVector) equal}.
Expand Down
Expand Up @@ -52,9 +52,8 @@ public BooleanBlock filter(int... positions) {
}

@Override
public ReleasableIterator<BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
// TODO optimizations
return new BooleanLookup(this, positions, targetBlockSize);
public ReleasableIterator<? extends BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return vector.lookup(positions, targetBlockSize);
}

@Override
Expand Down
Expand Up @@ -11,7 +11,9 @@
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BytesRefArray;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
Expand Down Expand Up @@ -91,6 +93,11 @@ public BytesRefVector filter(int... positions) {
}
}

@Override
public ReleasableIterator<BytesRefBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new BytesRefLookup(asBlock(), positions, targetBlockSize);
}

public static long ramBytesEstimated(BytesRefArray values) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values);
}
Expand Down
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;

import java.io.IOException;

Expand All @@ -34,6 +36,9 @@ public sealed interface BytesRefVector extends Vector permits ConstantBytesRefVe
@Override
BytesRefVector filter(int... positions);

@Override
ReleasableIterator<? extends BytesRefBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize);

/**
* Compares the given object with this vector for equality. Returns {@code true} if and only if the
* given object is a BytesRefVector, and both vectors are {@link #equals(BytesRefVector, BytesRefVector) equal}.
Expand Down
Expand Up @@ -63,9 +63,8 @@ public BytesRefBlock filter(int... positions) {
}

@Override
public ReleasableIterator<BytesRefBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
// TODO optimizations
return new BytesRefLookup(this, positions, targetBlockSize);
public ReleasableIterator<? extends BytesRefBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return vector.lookup(positions, targetBlockSize);
}

@Override
Expand Down
Expand Up @@ -8,6 +8,8 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;

/**
* Vector implementation that stores a constant boolean value.
Expand Down Expand Up @@ -39,6 +41,28 @@ public BooleanVector filter(int... positions) {
return blockFactory().newConstantBooleanVector(value, positions.length);
}

@Override
public ReleasableIterator<BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
if (positions.getPositionCount() == 0) {
return ReleasableIterator.empty();
}
IntVector positionsVector = positions.asVector();
if (positionsVector == null) {
return new BooleanLookup(asBlock(), positions, targetBlockSize);
}
int min = positionsVector.min();
if (min < 0) {
throw new IllegalArgumentException("invalid position [" + min + "]");
}
if (min > getPositionCount()) {
return ReleasableIterator.single((BooleanBlock) positions.blockFactory().newConstantNullBlock(positions.getPositionCount()));
}
if (positionsVector.max() < getPositionCount()) {
return ReleasableIterator.single(positions.blockFactory().newConstantBooleanBlockWith(value, positions.getPositionCount()));
}
return new BooleanLookup(asBlock(), positions, targetBlockSize);
}

@Override
public ElementType elementType() {
return ElementType.BOOLEAN;
Expand Down
Expand Up @@ -9,6 +9,8 @@

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;

/**
* Vector implementation that stores a constant BytesRef value.
Expand Down Expand Up @@ -45,6 +47,28 @@ public BytesRefVector filter(int... positions) {
return blockFactory().newConstantBytesRefVector(value, positions.length);
}

@Override
public ReleasableIterator<BytesRefBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
if (positions.getPositionCount() == 0) {
return ReleasableIterator.empty();
}
IntVector positionsVector = positions.asVector();
if (positionsVector == null) {
return new BytesRefLookup(asBlock(), positions, targetBlockSize);
}
int min = positionsVector.min();
if (min < 0) {
throw new IllegalArgumentException("invalid position [" + min + "]");
}
if (min > getPositionCount()) {
return ReleasableIterator.single((BytesRefBlock) positions.blockFactory().newConstantNullBlock(positions.getPositionCount()));
}
if (positionsVector.max() < getPositionCount()) {
return ReleasableIterator.single(positions.blockFactory().newConstantBytesRefBlockWith(value, positions.getPositionCount()));
}
return new BytesRefLookup(asBlock(), positions, targetBlockSize);
}

@Override
public ElementType elementType() {
return ElementType.BYTES_REF;
Expand Down
Expand Up @@ -8,6 +8,8 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;

/**
* Vector implementation that stores a constant double value.
Expand Down Expand Up @@ -39,6 +41,28 @@ public DoubleVector filter(int... positions) {
return blockFactory().newConstantDoubleVector(value, positions.length);
}

@Override
public ReleasableIterator<DoubleBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
if (positions.getPositionCount() == 0) {
return ReleasableIterator.empty();
}
IntVector positionsVector = positions.asVector();
if (positionsVector == null) {
return new DoubleLookup(asBlock(), positions, targetBlockSize);
}
int min = positionsVector.min();
if (min < 0) {
throw new IllegalArgumentException("invalid position [" + min + "]");
}
if (min > getPositionCount()) {
return ReleasableIterator.single((DoubleBlock) positions.blockFactory().newConstantNullBlock(positions.getPositionCount()));
}
if (positionsVector.max() < getPositionCount()) {
return ReleasableIterator.single(positions.blockFactory().newConstantDoubleBlockWith(value, positions.getPositionCount()));
}
return new DoubleLookup(asBlock(), positions, targetBlockSize);
}

@Override
public ElementType elementType() {
return ElementType.DOUBLE;
Expand Down
Expand Up @@ -8,6 +8,8 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;

/**
* Vector implementation that stores a constant int value.
Expand Down Expand Up @@ -39,6 +41,28 @@ public IntVector filter(int... positions) {
return blockFactory().newConstantIntVector(value, positions.length);
}

@Override
public ReleasableIterator<IntBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
if (positions.getPositionCount() == 0) {
return ReleasableIterator.empty();
}
IntVector positionsVector = positions.asVector();
if (positionsVector == null) {
return new IntLookup(asBlock(), positions, targetBlockSize);
}
int min = positionsVector.min();
if (min < 0) {
throw new IllegalArgumentException("invalid position [" + min + "]");
}
if (min > getPositionCount()) {
return ReleasableIterator.single((IntBlock) positions.blockFactory().newConstantNullBlock(positions.getPositionCount()));
}
if (positionsVector.max() < getPositionCount()) {
return ReleasableIterator.single(positions.blockFactory().newConstantIntBlockWith(value, positions.getPositionCount()));
}
return new IntLookup(asBlock(), positions, targetBlockSize);
}

/**
* The minimum value in the block.
*/
Expand Down
Expand Up @@ -8,6 +8,8 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;

/**
* Vector implementation that stores a constant long value.
Expand Down Expand Up @@ -39,6 +41,28 @@ public LongVector filter(int... positions) {
return blockFactory().newConstantLongVector(value, positions.length);
}

@Override
public ReleasableIterator<LongBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
if (positions.getPositionCount() == 0) {
return ReleasableIterator.empty();
}
IntVector positionsVector = positions.asVector();
if (positionsVector == null) {
return new LongLookup(asBlock(), positions, targetBlockSize);
}
int min = positionsVector.min();
if (min < 0) {
throw new IllegalArgumentException("invalid position [" + min + "]");
}
if (min > getPositionCount()) {
return ReleasableIterator.single((LongBlock) positions.blockFactory().newConstantNullBlock(positions.getPositionCount()));
}
if (positionsVector.max() < getPositionCount()) {
return ReleasableIterator.single(positions.blockFactory().newConstantLongBlockWith(value, positions.getPositionCount()));
}
return new LongLookup(asBlock(), positions, targetBlockSize);
}

@Override
public ElementType elementType() {
return ElementType.LONG;
Expand Down
Expand Up @@ -10,6 +10,8 @@
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;

import java.io.IOException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -90,6 +92,11 @@ public DoubleVector filter(int... positions) {
}
}

@Override
public ReleasableIterator<DoubleBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new DoubleLookup(asBlock(), positions, targetBlockSize);
}

public static long ramBytesEstimated(double[] values) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values);
}
Expand Down
Expand Up @@ -10,8 +10,10 @@
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.ReleasableIterator;

import java.io.IOException;

Expand Down Expand Up @@ -86,6 +88,11 @@ public DoubleVector filter(int... positions) {
return new DoubleBigArrayVector(filtered, positions.length, blockFactory);
}

@Override
public ReleasableIterator<DoubleBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new DoubleLookup(asBlock(), positions, targetBlockSize);
}

@Override
public void closeInternal() {
// The circuit breaker that tracks the values {@link DoubleArray} is adjusted outside
Expand Down

0 comments on commit 04d3b99

Please sign in to comment.