Skip to content

Commit

Permalink
ESQL: Add Block#lookup method (#107982)
Browse files Browse the repository at this point in the history
This adds a method to build a new `Block` by looking up the values in an
existing `Block`. Like `BlockHash#lookup` this returns a
`ReleasableIterator`. This should allow us to load values using the
results of `BlockHash#lookup`.
  • Loading branch information
nik9000 committed Apr 28, 2024
1 parent 4664ced commit 507c1b6
Show file tree
Hide file tree
Showing 38 changed files with 1,090 additions and 10 deletions.
27 changes: 27 additions & 0 deletions x-pack/plugin/esql/compute/build.gradle
Expand Up @@ -350,6 +350,33 @@ tasks.named('stringTemplates').configure {
it.inputFile = stateInputFile
it.outputFile = "org/elasticsearch/compute/aggregation/DoubleState.java"
}
// block builders
File lookupInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/data/X-Lookup.java.st")
template {
it.properties = intProperties
it.inputFile = lookupInputFile
it.outputFile = "org/elasticsearch/compute/data/IntLookup.java"
}
template {
it.properties = longProperties
it.inputFile = lookupInputFile
it.outputFile = "org/elasticsearch/compute/data/LongLookup.java"
}
template {
it.properties = doubleProperties
it.inputFile = lookupInputFile
it.outputFile = "org/elasticsearch/compute/data/DoubleLookup.java"
}
template {
it.properties = bytesRefProperties
it.inputFile = lookupInputFile
it.outputFile = "org/elasticsearch/compute/data/BytesRefLookup.java"
}
template {
it.properties = booleanProperties
it.inputFile = lookupInputFile
it.outputFile = "org/elasticsearch/compute/data/BooleanLookup.java"
}
File arrayStateInputFile = new File("${projectDir}/src/main/java/org/elasticsearch/compute/aggregation/X-ArrayState.java.st")
template {
it.properties = intProperties
Expand Down
Expand Up @@ -9,6 +9,8 @@

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
Expand Down Expand Up @@ -112,6 +114,11 @@ public BooleanBlock filter(int... positions) {
}
}

@Override
public ReleasableIterator<BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new BooleanLookup(this, positions, targetBlockSize);
}

@Override
public ElementType elementType() {
return ElementType.BOOLEAN;
Expand Down
Expand Up @@ -9,7 +9,9 @@

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
Expand Down Expand Up @@ -113,6 +115,11 @@ public BooleanBlock filter(int... positions) {
}
}

@Override
public ReleasableIterator<BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new BooleanLookup(this, positions, targetBlockSize);
}

@Override
public ElementType elementType() {
return ElementType.BOOLEAN;
Expand Down
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.index.mapper.BlockLoader;

import java.io.IOException;
Expand Down Expand Up @@ -38,6 +40,9 @@ public sealed interface BooleanBlock extends Block permits BooleanArrayBlock, Bo
@Override
BooleanBlock filter(int... positions);

@Override
ReleasableIterator<? extends BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize);

@Override
BooleanBlock expand();

Expand Down
@@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.data;

import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;

/**
* Generic {@link Block#lookup} implementation {@link BooleanBlock}s.
* This class is generated. Do not edit it.
*/
final class BooleanLookup implements ReleasableIterator<BooleanBlock> {
private final BooleanBlock values;
private final IntBlock positions;
private final long targetByteSize;
private int position;

private boolean first;
private int valuesInPosition;

BooleanLookup(BooleanBlock values, IntBlock positions, ByteSizeValue targetBlockSize) {
values.incRef();
positions.incRef();
this.values = values;
this.positions = positions;
this.targetByteSize = targetBlockSize.getBytes();
}

@Override
public boolean hasNext() {
return position < positions.getPositionCount();
}

@Override
public BooleanBlock next() {
try (BooleanBlock.Builder builder = positions.blockFactory().newBooleanBlockBuilder(positions.getTotalValueCount())) {
int count = 0;
while (position < positions.getPositionCount()) {
int start = positions.getFirstValueIndex(position);
int end = start + positions.getValueCount(position);
valuesInPosition = 0;
for (int i = start; i < end; i++) {
copy(builder, positions.getInt(i));
}
switch (valuesInPosition) {
case 0 -> builder.appendNull();
case 1 -> builder.appendBoolean(first);
default -> builder.endPositionEntry();
}
position++;
// TOOD what if the estimate is super huge? should we break even with less than MIN_TARGET?
if (++count > Operator.MIN_TARGET_PAGE_SIZE && builder.estimatedBytes() < targetByteSize) {
break;
}
}
return builder.build();
}
}

private void copy(BooleanBlock.Builder builder, int valuePosition) {
if (valuePosition >= values.getPositionCount()) {
return;
}
int start = values.getFirstValueIndex(valuePosition);
int end = start + values.getValueCount(valuePosition);
for (int i = start; i < end; i++) {
if (valuesInPosition == 0) {
first = values.getBoolean(i);
valuesInPosition++;
continue;
}
if (valuesInPosition == 1) {
builder.beginPositionEntry();
builder.appendBoolean(first);
}
if (valuesInPosition > Block.MAX_LOOKUP) {
// TODO replace this with a warning and break
throw new IllegalArgumentException("Found a single entry with " + valuesInPosition + " entries");
}
builder.appendBoolean(values.getBoolean(i));
valuesInPosition++;
}
}

@Override
public void close() {
Releasables.close(values, positions);
}
}
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.compute.data;

import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;

/**
Expand Down Expand Up @@ -49,6 +51,12 @@ public BooleanBlock filter(int... positions) {
return vector.filter(positions).asBlock();
}

@Override
public ReleasableIterator<BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
// TODO optimizations
return new BooleanLookup(this, positions, targetBlockSize);
}

@Override
public BooleanBlock expand() {
incRef();
Expand Down
Expand Up @@ -10,7 +10,9 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BytesRefArray;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
Expand Down Expand Up @@ -116,6 +118,11 @@ public BytesRefBlock filter(int... positions) {
}
}

@Override
public ReleasableIterator<BytesRefBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new BytesRefLookup(this, positions, targetBlockSize);
}

@Override
public ElementType elementType() {
return ElementType.BYTES_REF;
Expand Down
Expand Up @@ -12,6 +12,8 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.index.mapper.BlockLoader;

import java.io.IOException;
Expand Down Expand Up @@ -42,6 +44,9 @@ public sealed interface BytesRefBlock extends Block permits BytesRefArrayBlock,
@Override
BytesRefBlock filter(int... positions);

@Override
ReleasableIterator<? extends BytesRefBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize);

@Override
BytesRefBlock expand();

Expand Down
@@ -0,0 +1,99 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.data;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;

/**
* Generic {@link Block#lookup} implementation {@link BytesRefBlock}s.
* This class is generated. Do not edit it.
*/
final class BytesRefLookup implements ReleasableIterator<BytesRefBlock> {
private final BytesRef firstScratch = new BytesRef();
private final BytesRef valueScratch = new BytesRef();
private final BytesRefBlock values;
private final IntBlock positions;
private final long targetByteSize;
private int position;

private BytesRef first;
private int valuesInPosition;

BytesRefLookup(BytesRefBlock values, IntBlock positions, ByteSizeValue targetBlockSize) {
values.incRef();
positions.incRef();
this.values = values;
this.positions = positions;
this.targetByteSize = targetBlockSize.getBytes();
}

@Override
public boolean hasNext() {
return position < positions.getPositionCount();
}

@Override
public BytesRefBlock next() {
try (BytesRefBlock.Builder builder = positions.blockFactory().newBytesRefBlockBuilder(positions.getTotalValueCount())) {
int count = 0;
while (position < positions.getPositionCount()) {
int start = positions.getFirstValueIndex(position);
int end = start + positions.getValueCount(position);
valuesInPosition = 0;
for (int i = start; i < end; i++) {
copy(builder, positions.getInt(i));
}
switch (valuesInPosition) {
case 0 -> builder.appendNull();
case 1 -> builder.appendBytesRef(first);
default -> builder.endPositionEntry();
}
position++;
// TOOD what if the estimate is super huge? should we break even with less than MIN_TARGET?
if (++count > Operator.MIN_TARGET_PAGE_SIZE && builder.estimatedBytes() < targetByteSize) {
break;
}
}
return builder.build();
}
}

private void copy(BytesRefBlock.Builder builder, int valuePosition) {
if (valuePosition >= values.getPositionCount()) {
return;
}
int start = values.getFirstValueIndex(valuePosition);
int end = start + values.getValueCount(valuePosition);
for (int i = start; i < end; i++) {
if (valuesInPosition == 0) {
first = values.getBytesRef(i, firstScratch);
valuesInPosition++;
continue;
}
if (valuesInPosition == 1) {
builder.beginPositionEntry();
builder.appendBytesRef(first);
}
if (valuesInPosition > Block.MAX_LOOKUP) {
// TODO replace this with a warning and break
throw new IllegalArgumentException("Found a single entry with " + valuesInPosition + " entries");
}
builder.appendBytesRef(values.getBytesRef(i, valueScratch));
valuesInPosition++;
}
}

@Override
public void close() {
Releasables.close(values, positions);
}
}
Expand Up @@ -8,6 +8,8 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;

/**
Expand Down Expand Up @@ -50,6 +52,12 @@ public BytesRefBlock filter(int... positions) {
return vector.filter(positions).asBlock();
}

@Override
public ReleasableIterator<BytesRefBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
// TODO optimizations
return new BytesRefLookup(this, positions, targetBlockSize);
}

@Override
public BytesRefBlock expand() {
incRef();
Expand Down
Expand Up @@ -9,6 +9,8 @@

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
Expand Down Expand Up @@ -112,6 +114,11 @@ public DoubleBlock filter(int... positions) {
}
}

@Override
public ReleasableIterator<DoubleBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new DoubleLookup(this, positions, targetBlockSize);
}

@Override
public ElementType elementType() {
return ElementType.DOUBLE;
Expand Down

0 comments on commit 507c1b6

Please sign in to comment.