Skip to content

Commit

Permalink
Merge pull request #1916 from ohadzeliger/streaming-mode
Browse files Browse the repository at this point in the history
Resolve #1915: add support for additional cursor streaming modes
  • Loading branch information
alecgrieser committed Nov 17, 2022
2 parents eda52db + bb1842f commit bf869b8
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 12 deletions.
2 changes: 1 addition & 1 deletion docs/ReleaseNotes.md
Expand Up @@ -25,7 +25,7 @@ The Guava dependency version has been updated to 31.1. Projects may need to chec
* **Performance** Improvement 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Performance** Improvement 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Performance** Improvement 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Support additional streaming modes: LARGE, MEDIUM, SMALL [(Issue ##1915)](https://github.com/FoundationDB/fdb-record-layer/issues/#1915)
* **Feature** Feature 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
Expand Down
Expand Up @@ -32,5 +32,12 @@ public enum CursorStreamingMode {
/** The client will process records one-at-a-time. */
ITERATOR,
/** The client will load all records immediately, such as with {@link RecordCursor#asList}. */
WANT_ALL
WANT_ALL,
/** Advanced. Transfer data in batches small enough to not be much more expensive than reading individual rows,
* to minimize cost if iteration stops early */
SMALL,
/** Advanced. Transfer data in batches sized in between small and large */
MEDIUM,
/** Advanced. Transfer data in batches large enough to be, in a high-concurrency environment, nearly as efficient as possible */
LARGE
}
Expand Up @@ -252,8 +252,15 @@ public KeyValueCursor build() throws RecordCoreException {

final int limit = scanProperties.getExecuteProperties().getReturnedRowLimit();
final StreamingMode streamingMode;
if (scanProperties.getCursorStreamingMode() == CursorStreamingMode.ITERATOR) {
CursorStreamingMode propertiesStreamingMode = scanProperties.getCursorStreamingMode();
if (propertiesStreamingMode == CursorStreamingMode.ITERATOR) {
streamingMode = StreamingMode.ITERATOR;
} else if (propertiesStreamingMode == CursorStreamingMode.LARGE) {
streamingMode = StreamingMode.LARGE;
} else if (propertiesStreamingMode == CursorStreamingMode.MEDIUM) {
streamingMode = StreamingMode.MEDIUM;
} else if (propertiesStreamingMode == CursorStreamingMode.SMALL) {
streamingMode = StreamingMode.SMALL;
} else if (limit == ReadTransaction.ROW_LIMIT_UNLIMITED) {
streamingMode = StreamingMode.WANT_ALL;
} else {
Expand Down
Expand Up @@ -20,6 +20,7 @@

package com.apple.foundationdb.record.provider.foundationdb;

import com.apple.foundationdb.record.CursorStreamingMode;
import com.apple.foundationdb.record.TestRecords1Proto;
import com.apple.foundationdb.record.IndexFetchMethod;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryPlan;
Expand All @@ -28,6 +29,7 @@
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;

import javax.annotation.Nonnull;

Expand Down Expand Up @@ -83,8 +85,8 @@ void indexPrefetchSplitRecordReverseTest(IndexFetchMethod useIndexPrefetch) thro
}

@ParameterizedTest(name = "indexPrefetchManySplitRecordTest(" + ARGUMENTS_WITH_NAMES_PLACEHOLDER + ")")
@EnumSource()
void indexPrefetchManySplitRecordTest(IndexFetchMethod useIndexPrefetch) throws Exception {
@MethodSource("fetchMethodAndStreamMode")
void indexPrefetchManySplitRecordTest(IndexFetchMethod useIndexPrefetch, CursorStreamingMode streamingMode) throws Exception {
// TODO: This test actually runs the API in a way that returns results that are too large: Over 50MB
// FDB will fix the issue to limit the bytes returned and then this test would need to adjust accordingly.
int numTransactions = 8;
Expand All @@ -96,7 +98,7 @@ void indexPrefetchManySplitRecordTest(IndexFetchMethod useIndexPrefetch) throws
}
RecordQueryPlan plan = plan(NUM_VALUES_LARGER_THAN_1000_REVERSE, useIndexPrefetch);

executeAndVerifyData(plan, numRecordsPerTransaction * numTransactions, (rec, i) -> {
executeAndVerifyData(plan, null, serializableWithStreamingMode(streamingMode), numRecordsPerTransaction * numTransactions, (rec, i) -> {
int primaryKey = 200 + i;
String strValue = ((primaryKey % 2) == 0) ? "even" : "odd";
int numValue = 2000 - i;
Expand Down
Expand Up @@ -20,6 +20,7 @@

package com.apple.foundationdb.record.provider.foundationdb;

import com.apple.foundationdb.record.CursorStreamingMode;
import com.apple.foundationdb.record.ExecuteProperties;
import com.apple.foundationdb.record.IndexEntry;
import com.apple.foundationdb.record.IndexScanType;
Expand All @@ -45,6 +46,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;

import javax.annotation.Nonnull;
import java.util.ArrayList;
Expand Down Expand Up @@ -124,10 +126,10 @@ void indexPrefetchSimpleIndexReverseTest(IndexFetchMethod useIndexPrefetch) thro
* @param useIndexPrefetch the fetch method mode to use
*/
@ParameterizedTest(name = "indexPrefetchPrimaryKeyIndexTest(" + ARGUMENTS_WITH_NAMES_PLACEHOLDER + ")")
@EnumSource()
void indexPrefetchPrimaryKeyIndexTest(IndexFetchMethod useIndexPrefetch) throws Exception {
@MethodSource("fetchMethodAndStreamMode")
void indexPrefetchPrimaryKeyIndexTest(IndexFetchMethod useIndexPrefetch, CursorStreamingMode streamingMode) throws Exception {
RecordQueryPlan plan = plan(PRIMARY_KEY_EQUAL, useIndexPrefetch);
executeAndVerifyData(plan, 1, (rec, i) -> {
executeAndVerifyData(plan, null, serializableWithStreamingMode(streamingMode), 1, (rec, i) -> {
int primaryKey = 1;
String strValue = ((primaryKey % 2) == 0) ? "even" : "odd";
int numValue = 1000 - primaryKey;
Expand All @@ -137,10 +139,11 @@ void indexPrefetchPrimaryKeyIndexTest(IndexFetchMethod useIndexPrefetch) throws
}

@ParameterizedTest(name = "indexPrefetchComplexIndexTest(" + ARGUMENTS_WITH_NAMES_PLACEHOLDER + ")")
@EnumSource()
void indexPrefetchComplexIndexTest(IndexFetchMethod useIndexPrefetch) throws Exception {
@MethodSource("fetchMethodAndStreamMode")
void indexPrefetchComplexIndexTest(IndexFetchMethod useIndexPrefetch, CursorStreamingMode streamingMode) throws Exception {
RecordQueryPlan plan = plan(STR_VALUE_EVEN, useIndexPrefetch);
executeAndVerifyData(plan, 50, (rec, i) -> {
// Pass in every supported streaming mode. The result should not change.
executeAndVerifyData(plan, null, serializableWithStreamingMode(streamingMode), 50, (rec, i) -> {
int primaryKey = i * 2;
int numValue = 1000 - primaryKey;
assertRecord(rec, primaryKey, "even", numValue, "MySimpleRecord$str_value_indexed", "even", primaryKey); // we are filtering out all odd entries, so count*2 are the keys of the even ones
Expand Down
Expand Up @@ -20,8 +20,11 @@

package com.apple.foundationdb.record.provider.foundationdb;

import com.apple.foundationdb.record.CursorStreamingMode;
import com.apple.foundationdb.record.ExecuteProperties;
import com.apple.foundationdb.record.ExecuteState;
import com.apple.foundationdb.record.IndexEntry;
import com.apple.foundationdb.record.IsolationLevel;
import com.apple.foundationdb.record.RecordCursor;
import com.apple.foundationdb.record.RecordCursorIterator;
import com.apple.foundationdb.record.ScanProperties;
Expand All @@ -37,12 +40,14 @@
import com.apple.test.Tags;
import com.google.protobuf.Message;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.provider.Arguments;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.stream.Stream;

import static com.apple.foundationdb.record.provider.foundationdb.FDBStoreTimer.Counts.REMOTE_FETCH;
import static com.apple.foundationdb.record.provider.foundationdb.FDBStoreTimer.Events.SCAN_REMOTE_FETCH_ENTRY;
Expand Down Expand Up @@ -94,6 +99,12 @@ public class RemoteFetchTestBase extends FDBRecordStoreQueryTestBase {
.setFilter(Query.field("rec_no").equalsValue(1L))
.build();

public static Stream<Arguments> fetchMethodAndStreamMode() {
return Stream.of(IndexFetchMethod.values())
.flatMap(indexFetchMethod -> Stream.of(CursorStreamingMode.ITERATOR, CursorStreamingMode.LARGE, CursorStreamingMode.MEDIUM, CursorStreamingMode.SMALL)
.map(streamMode -> Arguments.of(indexFetchMethod, streamMode)));
}

protected void assertRecord(final FDBQueriedRecord<Message> rec, final long primaryKey, final String strValue,
final int numValue, final String indexName, Object indexedValue) {
assertBaseRecord(rec, primaryKey, strValue, numValue, indexName, indexedValue);
Expand Down Expand Up @@ -241,4 +252,14 @@ protected List<FDBQueriedRecord<Message>> scanToList(FDBRecordContext context, S
}
return results;
}

@Nonnull
protected ExecuteProperties serializableWithStreamingMode(final CursorStreamingMode streamingMode) {
final ExecuteProperties execProperties = ExecuteProperties.newBuilder()
.setIsolationLevel(IsolationLevel.SERIALIZABLE)
.setState(ExecuteState.NO_LIMITS)
.setDefaultCursorStreamingMode(streamingMode)
.build();
return execProperties;
}
}

0 comments on commit bf869b8

Please sign in to comment.