-
Notifications
You must be signed in to change notification settings - Fork 101
/
FDBRecordStoreBase.java
2183 lines (2015 loc) · 109 KB
/
FDBRecordStoreBase.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* FDBRecordStoreBase.java
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2015-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.apple.foundationdb.record.provider.foundationdb;
import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.record.EndpointType;
import com.apple.foundationdb.record.EvaluationContext;
import com.apple.foundationdb.record.ExecuteProperties;
import com.apple.foundationdb.record.ExecuteState;
import com.apple.foundationdb.record.IndexEntry;
import com.apple.foundationdb.record.IndexScanType;
import com.apple.foundationdb.record.IndexState;
import com.apple.foundationdb.record.IsolationLevel;
import com.apple.foundationdb.record.PipelineOperation;
import com.apple.foundationdb.record.RecordCoreException;
import com.apple.foundationdb.record.RecordCoreStorageException;
import com.apple.foundationdb.record.RecordCursor;
import com.apple.foundationdb.record.RecordFunction;
import com.apple.foundationdb.record.RecordIndexUniquenessViolation;
import com.apple.foundationdb.record.RecordMetaDataBuilder;
import com.apple.foundationdb.record.RecordMetaDataProto;
import com.apple.foundationdb.record.RecordMetaDataProvider;
import com.apple.foundationdb.record.ScanProperties;
import com.apple.foundationdb.record.TupleRange;
import com.apple.foundationdb.record.logging.LogMessageKeys;
import com.apple.foundationdb.record.metadata.Index;
import com.apple.foundationdb.record.metadata.IndexAggregateFunction;
import com.apple.foundationdb.record.metadata.IndexRecordFunction;
import com.apple.foundationdb.record.metadata.Key;
import com.apple.foundationdb.record.metadata.RecordType;
import com.apple.foundationdb.record.metadata.StoreRecordFunction;
import com.apple.foundationdb.record.metadata.expressions.EmptyKeyExpression;
import com.apple.foundationdb.record.metadata.expressions.KeyExpression;
import com.apple.foundationdb.record.provider.common.RecordSerializer;
import com.apple.foundationdb.record.provider.foundationdb.keyspace.KeySpacePath;
import com.apple.foundationdb.record.provider.foundationdb.storestate.FDBRecordStoreStateCache;
import com.apple.foundationdb.record.query.IndexQueryabilityFilter;
import com.apple.foundationdb.record.query.ParameterRelationshipGraph;
import com.apple.foundationdb.record.query.RecordQuery;
import com.apple.foundationdb.record.query.expressions.QueryComponent;
import com.apple.foundationdb.record.query.plan.RecordQueryPlanner;
import com.apple.foundationdb.record.query.plan.plans.QueryResult;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryPlan;
import com.apple.foundationdb.subspace.Subspace;
import com.apple.foundationdb.tuple.Tuple;
import com.apple.foundationdb.tuple.TupleHelpers;
import com.google.protobuf.Message;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
/**
* Base interface for typed and untyped record stores.
*
* This interface is the main front-end for most operations inserting, modifying, or querying data through
* the Record Layer. A record store combines:
*
* <ul>
* <li>A {@link Subspace} (often specified via a {@link KeySpacePath})</li>
* <li>The {@link com.apple.foundationdb.record.RecordMetaData RecordMetaData} associated with the data stored with the data in that subspace</li>
* <li>An {@link FDBRecordContext} which wraps a FoundationDB {@link com.apple.foundationdb.Transaction Transaction}</li>
* </ul>
*
* <p>
* All of the record store's data—including index data—are stored durably within the given subspace. Note that
* the meta-data is <em>not</em> stored by the record store directly. However, information about the store's current meta-data
* version is persisted with the store to detect when the meta-data have changed and to know if any action needs to be taken
* to begin using the new meta-data. (For example, new indexes might need to be built and removed indexes deleted.) The same
* meta-data may be used for multiple record stores, and separating the meta-data from the data makes updating the shared
* meta-data simpler as it only needs to be updated in one place. The {@link FDBMetaDataStore} may be used if one wishes
* to persist the meta-data into a FoundationDB cluster.
* </p>
*
* <p>
* All operations conducted by a record store are conducted within the lifetime single transaction, and no data is persisted
* to the database until the transaction is committed by calling {@link FDBRecordContext#commit()} or
* {@link FDBRecordContext#commitAsync()}. Record Layer transactions inherit all of the guarantees and limitations of
* the transactions exposed by FoundationDB, including their durability and consistency guarantees as well as size and
* duration limits. See the FoundationDB <a href="https://apple.github.io/foundationdb/known-limitations.html">known limitations</a>
* for more details.
* </p>
*
* <p>
* The record store also allows the user to tweak additional parameters such as what the parallelism of pipelined operations
* should be (through the {@link PipelineSizer}) and what serializer should be used to read and write data to the database.
* See the {@link BaseBuilder} interface for more details.
* </p>
*
* @param <M> type used to represent stored records
* @see FDBRecordStore
* @see FDBTypedRecordStore
*/
@API(API.Status.MAINTAINED)
public interface FDBRecordStoreBase<M extends Message> extends RecordMetaDataProvider {
/**
* Get the untyped record store associated with this possibly typed store.
* @return an untyped record store
*/
FDBRecordStore getUntypedRecordStore();
/**
* Get a typed record store using the given typed serializer.
* @param <N> the type for the new record store
* @param serializer typed serializer to use
* @return a new typed record store
*/
default <N extends Message> FDBTypedRecordStore<N> getTypedRecordStore(@Nonnull RecordSerializer<N> serializer) {
return new FDBTypedRecordStore<>(getUntypedRecordStore(), serializer);
}
/**
* Get the record context (transaction) to use for the record store.
* @return context the record context / transaction to use
*/
@Nonnull
FDBRecordContext getContext();
@Nonnull
default Executor getExecutor() {
return getContext().getExecutor();
}
@Nullable
default FDBStoreTimer getTimer() {
return getContext().getTimer();
}
/**
* Get the subspace provider.
* @return the subspace provider
*/
@Nullable
SubspaceProvider getSubspaceProvider();
/**
* Get the serializer used to convert records into byte arrays.
* @return the serializer to use
*/
@Nonnull
RecordSerializer<M> getSerializer();
/**
* Hook for checking if store state for client changes.
*/
interface UserVersionChecker {
/**
* Check the user version.
* <p>
* If this store is being created for the first time, the store header will be set to the default instance. In
* particular, the format version will be zero, whereas the format version on all stores that have already been
* created will be greater than zero.
*
* @param storeHeader store header
* @param metaData the meta-data provider that will be used to get meta-data
*
* @return the user version to store in the record info header
*/
@SuppressWarnings("deprecation")
default CompletableFuture<Integer> checkUserVersion(@Nonnull RecordMetaDataProto.DataStoreInfo storeHeader,
RecordMetaDataProvider metaData) {
final boolean newStore = storeHeader.getFormatVersion() == 0;
final int oldMetaDataVersion = newStore ? -1 : storeHeader.getMetaDataversion();
final int oldUserVersion = newStore ? -1 : storeHeader.getUserVersion();
return checkUserVersion(oldUserVersion, oldMetaDataVersion, metaData);
}
/**
* Check the user version.
* <p>
* This method is not called on classes that override
* {@link #checkUserVersion(RecordMetaDataProto.DataStoreInfo, RecordMetaDataProvider)}. Such implementations
* can just throw an exception from here.
*
* @param oldUserVersion the old user version or <code>-1</code> if this is a new record store
* @param oldMetaDataVersion the old meta-data version
* @param metaData the meta-data provider that will be used to get meta-data
*
* @return the user version to store in the record info header
*
* @deprecated use {@link #checkUserVersion(RecordMetaDataProto.DataStoreInfo, RecordMetaDataProvider)} instead
*/
@API(API.Status.DEPRECATED)
@Deprecated
CompletableFuture<Integer> checkUserVersion(int oldUserVersion, int oldMetaDataVersion,
RecordMetaDataProvider metaData);
/**
* Determine what to do about an index needing to be built. When a {@link FDBRecordStore} is opened,
* this method will be called on any index that has been added to the
* {@link com.apple.foundationdb.record.RecordMetaData RecordMetaData} since the last time the record store was
* opened. The index will then be initialized with the {@link IndexState} returned, which in turn determines
* whether the index must be maintained during record inserts and deletes and also whether the index
* can be read (for queries, for example). In general, an index is only really useful if it is
* {@link IndexState#READABLE}, but if this method returns {@link IndexState#READABLE}, then the
* index must be built in the same transaction that opens the record store, which can lead to errors
* on large stores if the index cannot be built in the
* <a href="https://apple.github.io/foundationdb/known-limitations.html#long-running-transactions">five second
* FoundationDB transaction time limit</a>.
*
* <p>
* By default, this will return {@link IndexState#READABLE} for any indexes on new types (which
* can be used right away without doing any I/O) or if the number of records in the store is small (below
* {@link FDBRecordStore#MAX_RECORDS_FOR_REBUILD}). However, if the record store is large, this will return
* {@link IndexState#DISABLED}, which indicates that the index should not be maintained and that it cannot be
* used until the index is built by the {@link OnlineIndexer}.
* </p>
*
* <p>
* For adopters, two utility methods are provided that can be used to make implementing this method easier.
* The first is {@link FDBRecordStore#disabledIfTooManyRecordsForRebuild(long, boolean)}, which replicates
* the default behavior. The second is
* {@link FDBRecordStore#writeOnlyIfTooManyRecordsForRebuild(long, boolean)}, which is similar to the default
* except that it returns {@link IndexState#WRITE_ONLY} instead of {@link IndexState#DISABLED} and was the
* default prior to Record Layer 3.0. Note that all indexes must be made {@link IndexState#WRITE_ONLY} before
* they can be built, but the {@link OnlineIndexer} should generally handle that index state transition, and so
* most adopters should return {@link IndexState#DISABLED} on indexes that cannot be built in-line.
* </p>
*
* @param index the index that has not been built for this store
* @param recordCount the number of records already in the store
* @param indexOnNewRecordTypes <code>true</code> if all record types for the index are new (the number of
* records related to this index is 0), in which case the index is able to be
* "rebuilt" instantly with no cost.
* @return the desired state of the new index. If this is {@link IndexState#READABLE}, the index will be built right away
* @see FDBRecordStore#disabledIfTooManyRecordsForRebuild(long, boolean)
* @see FDBRecordStore#writeOnlyIfTooManyRecordsForRebuild(long, boolean)
*/
default IndexState needRebuildIndex(Index index, long recordCount, boolean indexOnNewRecordTypes) {
return FDBRecordStore.disabledIfTooManyRecordsForRebuild(recordCount, indexOnNewRecordTypes);
}
/**
* Determine what to do about an index needing to be rebuilt. For more information about when this method
* is called and what the return value is used for, see {@link #needRebuildIndex(Index, long, boolean)}.
*
* <p>
* This method takes the record count and {@linkplain FDBRecordStore#estimateRecordsSizeAsync() estimated size}
* as parameters. Implementors can choose to use either value (or neither) when determining whether an index
* should be built in-line (i.e., whether to return {@link IndexState#READABLE}) when the meta-data on a store
* is upgraded. For record types on which an appropriate
* {@linkplain com.apple.foundationdb.record.metadata.IndexTypes#COUNT count} index is either not defined or
* for which scanning the count index would be too much work due to grouping keys (see
* <a href="https://github.com/foundationDB/fdb-record-layer/issues/7">Issue #7</a>), it may be more efficient
* to base the indexing decision on the size estimate alone.
* </p>
*
* <p>
* Both the record count and size estimate parameter are specified via suppliers that will not evaluate the
* count or size until requested. If a value is requested from either supplier, the returned future should
* complete after the future returned by the supplier.
* </p>
*
* <p>
* By default, this will call {@link #needRebuildIndex(Index, long, boolean)} with the record count returned
* by {@code lazyRecordCount}, so adopters who want to use the record count but not the records size estimate
* only need to implement that function.
* </p>
*
* @param index the index that has not been built for this store
* @param lazyRecordCount a supplier that will return a future with the number of records already in the store
* @param lazyEstimatedSize a supplier that will return a future that will resolve to an estimate of the size of
* the store in bytes
* @param indexOnNewRecordTypes <code>true</code> if all record types for the index are new (the number of
* records related to this index is 0), in which case the index is able to be
* "rebuilt" instantly with no cost.
* @return a future that will complete to the desired state of the new index
* @see #needRebuildIndex(Index, long, boolean)
*/
@API(API.Status.EXPERIMENTAL)
@Nonnull
default CompletableFuture<IndexState> needRebuildIndex(Index index,
Supplier<CompletableFuture<Long>> lazyRecordCount,
Supplier<CompletableFuture<Long>> lazyEstimatedSize,
boolean indexOnNewRecordTypes) {
return lazyRecordCount.get()
.thenApply(recordCount -> needRebuildIndex(index, recordCount, indexOnNewRecordTypes));
}
}
/**
* Action to take if the record store does / does not already exist.
* @see FDBRecordStore.Builder#createOrOpenAsync(FDBRecordStoreBase.StoreExistenceCheck)
*/
enum StoreExistenceCheck {
/**
* No special action.
*
* This should be used with care, since if the record store already has records, there is
* no guarantee that they were written at the current versions (meta-data and format).
* It is really only appropriate in development when switching from {@code uncheckedOpen}
* or {@code build} to a checked open.
*/
NONE,
/**
* Throw if the record store does not have an info header but does have have at least one
* record. This differs from {@link #ERROR_IF_NO_INFO_AND_NOT_EMPTY} in that there is
* data stored in the record store other than just the records and the indexes, including
* meta-data about which indexes have been built. A record store that is missing a header
* but has this other data is in a corrupt state, but as there are no records, it can be
* recovered when creating the store in a straightforward way.
*/
ERROR_IF_NO_INFO_AND_HAS_RECORDS_OR_INDEXES,
/**
* Throw if the record store does not have an info header but is not empty. Unlike with
* {@link #ERROR_IF_NO_INFO_AND_HAS_RECORDS_OR_INDEXES}, this existence check will throw an
* error even if there are no records in the store, only data stored internally by the
* Record Layer.
*
* This corresponds to {@link FDBRecordStore.Builder#createOrOpen}
*/
ERROR_IF_NO_INFO_AND_NOT_EMPTY,
/**
* Throw if the record store already exists.
*
* This corresponds to {@link FDBRecordStore.Builder#create}
* @see RecordStoreAlreadyExistsException
*/
ERROR_IF_EXISTS,
/**
* Throw if the record store does not already exist.
*
* This corresponds to {@link FDBRecordStore.Builder#open}
* @see RecordStoreDoesNotExistException
*/
ERROR_IF_NOT_EXISTS
}
/**
* Action to take if the record being saved does / does not already exist.
* @see FDBRecordStoreBase#saveRecordAsync(Message, RecordExistenceCheck)
*/
enum RecordExistenceCheck {
/**
* No special action.
*
* This corresponds to {@link FDBRecordStoreBase#saveRecord}
*/
NONE,
/**
* Throw if the record already exists.
*
* This corresponds to {@link FDBRecordStoreBase#insertRecord}
* @see RecordAlreadyExistsException
*/
ERROR_IF_EXISTS,
/**
* Throw if the record does not already exist.
*
* @see RecordDoesNotExistException
*/
ERROR_IF_NOT_EXISTS,
/**
* Throw if an existing record has a different record type.
*
* @see RecordTypeChangedException
*/
ERROR_IF_RECORD_TYPE_CHANGED,
/**
* Throw if the record does not already exist or has a different record type.
*
* This corresponds to {@link FDBRecordStoreBase#updateRecord}
* @see RecordDoesNotExistException
* @see RecordTypeChangedException
*/
ERROR_IF_NOT_EXISTS_OR_RECORD_TYPE_CHANGED;
public boolean errorIfExists() {
return this == ERROR_IF_EXISTS;
}
public boolean errorIfNotExists() {
return this == ERROR_IF_NOT_EXISTS || this == ERROR_IF_NOT_EXISTS_OR_RECORD_TYPE_CHANGED;
}
public boolean errorIfTypeChanged() {
return this == ERROR_IF_RECORD_TYPE_CHANGED || this == ERROR_IF_NOT_EXISTS_OR_RECORD_TYPE_CHANGED;
}
}
/**
* Provided during record save (via {@link #saveRecord(Message, FDBRecordVersion, VersionstampSaveBehavior)}),
* directs the behavior of the save w.r.t. the record's version.
* In the presence of a version, either {@code DEFAULT} or {@code WITH_VERSION} can be used.
* For safety, <code>NO_VERSION</code> should only be used with a null version.
*/
enum VersionstampSaveBehavior {
/**
* Match the behavior dictated by the meta-data. If {@link com.apple.foundationdb.record.RecordMetaData#isStoreRecordVersions()}
* returns {@code true}, this will always store the record with a version (like {@link #WITH_VERSION}). Otherwise,
* it will store the record with the provided version if given or with no version if not.
*/
DEFAULT,
/**
* Do not save the record with a version. If a non-null version is provided to {@link #saveRecord(Message, FDBRecordVersion, VersionstampSaveBehavior)},
* then an error will be thrown.
*/
NO_VERSION,
/**
* Always save the record with a version. If a null version is provided, then the record store will chose
* a new version.
*
* <p>
* Note: due to <a href="https://github.com/FoundationDB/fdb-record-layer/issues/964">Issue #964</a>, on some
* older record stores, namely those that were originally created with a {@linkplain FDBRecordStore#getFormatVersion()
* format version} below {@link FDBRecordStore#SAVE_VERSION_WITH_RECORD_FORMAT_VERSION}, records written with a
* version on stores where {@link com.apple.foundationdb.record.RecordMetaData#isStoreRecordVersions()} is
* {@code false} will not return the version with a record when read, even though the version will be stored.
* Users can avoid this by either migrating data to a new store or by setting {@code isStoreRecordVersions()}
* to {@code true} in the meta-data and then supplying the {@link #NO_VERSION} when saving any records that
* do not need an associated version.
* </p>
*/
WITH_VERSION,
/**
* Save a record with a version if and only if a non-null version is passed to {@link #saveRecord(Message, FDBRecordVersion, VersionstampSaveBehavior)}.
* In this mode, the record store will never assign a version to the record, but it will always use the
* version provided (or store the record with no version if {@code null}). This is useful if one is copying
* data from one record store to another and one wants to preserve the versions (including non-versions) for each
* record.
*/
IF_PRESENT,
}
/**
* Async version of {@link #saveRecord(Message)}.
* @param rec the record to save
* @return a future that completes with the stored record form of the saved record
*/
@Nonnull
default CompletableFuture<FDBStoredRecord<M>> saveRecordAsync(@Nonnull final M rec) {
return saveRecordAsync(rec, (FDBRecordVersion)null);
}
/**
* Async version of {@link #saveRecord(Message, RecordExistenceCheck)}.
* @param rec the record to save
* @param existenceCheck when to throw an exception if a record with the same primary key does or does not already exist
* @return a future that completes with the stored record form of the saved record
*/
@Nonnull
default CompletableFuture<FDBStoredRecord<M>> saveRecordAsync(@Nonnull final M rec, @Nonnull RecordExistenceCheck existenceCheck) {
return saveRecordAsync(rec, existenceCheck, null, VersionstampSaveBehavior.DEFAULT);
}
/**
* Async version of {@link #saveRecord(Message, FDBRecordVersion)}.
* @param rec the record to save
* @param version the associated record version
* @return a future that completes with the stored record form of the saved record
*/
@Nonnull
default CompletableFuture<FDBStoredRecord<M>> saveRecordAsync(@Nonnull final M rec, @Nullable FDBRecordVersion version) {
return saveRecordAsync(rec, version, VersionstampSaveBehavior.DEFAULT);
}
/**
* Async version of {@link #saveRecord(Message, FDBRecordVersion, VersionstampSaveBehavior)}.
* @param rec the record to save
* @param version the associated record version
* @param behavior the save behavior w.r.t. the given <code>version</code>
* @return a future that completes with the stored record form of the saved record
*/
@Nonnull
default CompletableFuture<FDBStoredRecord<M>> saveRecordAsync(@Nonnull final M rec, @Nullable FDBRecordVersion version, @Nonnull final VersionstampSaveBehavior behavior) {
return saveRecordAsync(rec, RecordExistenceCheck.NONE, version, behavior);
}
/**
* Async version of {@link #saveRecord(Message, RecordExistenceCheck, FDBRecordVersion, VersionstampSaveBehavior)}.
* @param rec the record to save
* @param existenceCheck when to throw an exception if a record with the same primary key does or does not already exist
* @param version the associated record version
* @param behavior the save behavior w.r.t. the given <code>version</code>
* @return a future that completes with the stored record form of the saved record
*/
@Nonnull
CompletableFuture<FDBStoredRecord<M>> saveRecordAsync(@Nonnull M rec, @Nonnull RecordExistenceCheck existenceCheck,
@Nullable FDBRecordVersion version, @Nonnull VersionstampSaveBehavior behavior);
/**
* Save the given record.
* @param rec the record to be saved
* @return wrapping object containing saved record and metadata
*/
@Nonnull
default FDBStoredRecord<M> saveRecord(@Nonnull final M rec) {
return saveRecord(rec, (FDBRecordVersion)null);
}
/**
* Save the given record.
* @param rec the record to be saved
* @param existenceCheck when to throw an exception if a record with the same primary key does or does not already exist
* @return wrapping object containing saved record and metadata
*/
@Nonnull
default FDBStoredRecord<M> saveRecord(@Nonnull final M rec, @Nonnull RecordExistenceCheck existenceCheck) {
return saveRecord(rec, existenceCheck, null, VersionstampSaveBehavior.DEFAULT);
}
/**
* Save the given record with a specific version. If <code>null</code>
* is passed for <code>version</code>, then a new version is
* created that will be unique for this record.
* @param rec the record to be saved
* @param version the version to associate with the record when saving
* @return wrapping object containing saved record and metadata
*/
@Nonnull
default FDBStoredRecord<M> saveRecord(@Nonnull final M rec, @Nullable final FDBRecordVersion version) {
return saveRecord(rec, version, VersionstampSaveBehavior.DEFAULT);
}
/**
* Save the given record with a specific version.
* The version is handled according to the behavior value. If behavior is <code>DEFAULT</code> then
* the method acts as {@link #saveRecord(Message, FDBRecordVersion)}. If behavior is <code>NO_VERSION</code> then
* <code>version</code> is ignored and no version is saved. If behavior is <code>WITH_VERSION</code> then the value
* of <code>version</code> is stored as given by the caller.
* @param rec the record to be saved
* @param version the version to associate with the record when saving
* @param behavior the save behavior w.r.t. the given <code>version</code>
* @return wrapping object containing saved record and metadata
*/
@Nonnull
default FDBStoredRecord<M> saveRecord(@Nonnull final M rec, @Nullable final FDBRecordVersion version, @Nonnull final VersionstampSaveBehavior behavior) {
return saveRecord(rec, RecordExistenceCheck.NONE, version, behavior);
}
/**
* Save the given record with a specific version.
* The version is handled according to the behavior value. If behavior is <code>DEFAULT</code> then
* the method acts as {@link #saveRecord(Message, FDBRecordVersion)}. If behavior is <code>NO_VERSION</code> then
* <code>version</code> is ignored and no version is saved. If behavior is <code>WITH_VERSION</code> then the value
* of <code>version</code> is stored as given by the caller.
* @param rec the record to be saved
* @param existenceCheck when to throw an exception if a record with the same primary key does or does not already exist
* @param version the version to associate with the record when saving
* @param behavior the save behavior w.r.t. the given <code>version</code>
* @return wrapping object containing saved record and metadata
*/
@Nonnull
default FDBStoredRecord<M> saveRecord(@Nonnull final M rec, @Nonnull RecordExistenceCheck existenceCheck,
@Nullable final FDBRecordVersion version, @Nonnull final VersionstampSaveBehavior behavior) {
return getContext().asyncToSync(FDBStoreTimer.Waits.WAIT_SAVE_RECORD, saveRecordAsync(rec, existenceCheck, version, behavior));
}
/**
* Save the given record and throw an exception if a record already exists with the same primary key.
* @param rec the record to be saved
* @return a future that completes with the stored record form of the saved record
*/
@Nonnull
default CompletableFuture<FDBStoredRecord<M>> insertRecordAsync(@Nonnull final M rec) {
return saveRecordAsync(rec, RecordExistenceCheck.ERROR_IF_EXISTS);
}
/**
* Save the given record and throw an exception if a record already exists with the same primary key.
* @param rec the record to be saved
* @return wrapping object containing saved record and metadata
*/
@Nonnull
default FDBStoredRecord<M> insertRecord(@Nonnull final M rec) {
return saveRecord(rec, RecordExistenceCheck.ERROR_IF_EXISTS);
}
/**
* Save the given record and throw an exception if the record does not already exist in the database.
* @param rec the record to be saved
* @return a future that completes with the stored record form of the saved record
*/
@Nonnull
default CompletableFuture<FDBStoredRecord<M>> updateRecordAsync(@Nonnull final M rec) {
return saveRecordAsync(rec, RecordExistenceCheck.ERROR_IF_NOT_EXISTS_OR_RECORD_TYPE_CHANGED);
}
/**
* Save the given record and throw an exception if the record does not already exist in the database.
* @param rec the record to be saved
* @return wrapping object containing saved record and metadata
*/
@Nonnull
default FDBStoredRecord<M> updateRecord(@Nonnull final M rec) {
return saveRecord(rec, RecordExistenceCheck.ERROR_IF_NOT_EXISTS_OR_RECORD_TYPE_CHANGED);
}
/**
* Load the record with the given primary key.
* @param primaryKey the primary key for the record
* @return a {@link FDBStoredRecord} for the record or <code>null</code>.
*/
@Nullable
default FDBStoredRecord<M> loadRecord(@Nonnull final Tuple primaryKey) {
return getContext().asyncToSync(FDBStoreTimer.Waits.WAIT_LOAD_RECORD, loadRecordAsync(primaryKey));
}
/**
* Load the record with the given primary key.
* @param primaryKey the primary key for the record
* @param snapshot whether to load at snapshot isolation
* @return a {@link FDBStoredRecord} for the record or <code>null</code>.
*/
@Nullable
default FDBStoredRecord<M> loadRecord(@Nonnull final Tuple primaryKey, final boolean snapshot) {
return getContext().asyncToSync(FDBStoreTimer.Waits.WAIT_LOAD_RECORD, loadRecordAsync(primaryKey, snapshot));
}
/**
* Asynchronously load a record.
* @param primaryKey the key for the record to be loaded
* @return a CompletableFuture that will return a message or null if there was no record with that key
*/
@Nonnull
default CompletableFuture<FDBStoredRecord<M>> loadRecordAsync(@Nonnull final Tuple primaryKey) {
return loadRecordAsync(primaryKey, false);
}
/**
* Asynchronously load a record.
* @param primaryKey the key for the record to be loaded
* @param snapshot whether to load at snapshot isolation
* @return a CompletableFuture that will return a message or null if there was no record with that key
*/
@Nonnull
default CompletableFuture<FDBStoredRecord<M>> loadRecordAsync(@Nonnull final Tuple primaryKey, final boolean snapshot) {
return loadRecordInternal(primaryKey, ExecuteState.NO_LIMITS, snapshot);
}
@Nonnull
@API(API.Status.INTERNAL)
CompletableFuture<FDBStoredRecord<M>> loadRecordInternal(@Nonnull Tuple primaryKey, @Nonnull ExecuteState executeState, boolean snapshot);
/**
* Get record into FDB RYW cache.
* Caller needs to hold on to result until ready or else there is a chance it will get
* GC'ed and cancelled before then.
* @param primaryKey the primary key for the record to retrieve
* @return a future that will return {@code null} when the record is preloaded
*/
@Nonnull
CompletableFuture<Void> preloadRecordAsync(@Nonnull Tuple primaryKey);
/**
* Check if a record exists in the record store with the given primary key.
* This performs its reads at the {@link IsolationLevel#SERIALIZABLE} isolation level.
*
* @param primaryKey the primary key of the record
* @return a future that will complete to <code>true</code> if some record in record store has that primary key and
* <code>false</code> otherwise
* @see #recordExistsAsync(Tuple, IsolationLevel)
*/
@Nonnull
default CompletableFuture<Boolean> recordExistsAsync(@Nonnull final Tuple primaryKey) {
return recordExistsAsync(primaryKey, IsolationLevel.SERIALIZABLE);
}
/**
* Check if a record exists in the record store with the given primary key.
* This is slightly more efficient than loading the record and checking if that record is <code>null</code>
* as it does not have to deserialize the record, though the record's contents are still read from the
* database and sent over the network.
*
* @param primaryKey the primary key of the record
* @param isolationLevel the isolation level to use when reading
* @return a future that will complete to <code>true</code> if some record in record store has that primary key and
* <code>false</code> otherwise
*/
@Nonnull
CompletableFuture<Boolean> recordExistsAsync(@Nonnull Tuple primaryKey, @Nonnull IsolationLevel isolationLevel);
/**
* Check if a record exists in the record store with the given primary key.
* This method is blocking. For the non-blocking version of this method, see {@link #recordExistsAsync(Tuple)}.
*
* @param primaryKey the primary key of the record
* @return <code>true</code> if some record in record store has that primary key and <code>false</code> otherwise
* @see #recordExistsAsync(Tuple)
*/
default boolean recordExists(@Nonnull final Tuple primaryKey) {
return getContext().asyncToSync(FDBStoreTimer.Waits.WAIT_RECORD_EXISTS, recordExistsAsync(primaryKey));
}
/**
* Check if a record exists in the record store with the given primary key.
* This method is blocking. For the non-blocking version of this method, see {@link #recordExistsAsync(Tuple, IsolationLevel)}.
*
* @param primaryKey the primary key of the record
* @param isolationLevel the isolation level to use when reading
* @return <code>true</code> if some record in record store has that primary key and <code>false</code> otherwise
* @see #recordExistsAsync(Tuple)
*/
default boolean recordExists(@Nonnull final Tuple primaryKey, @Nonnull final IsolationLevel isolationLevel) {
return getContext().asyncToSync(FDBStoreTimer.Waits.WAIT_RECORD_EXISTS, recordExistsAsync(primaryKey, isolationLevel));
}
/**
* Add a read conflict as if one had read the record with the given primary key. This will cause this transaction
* to fail (with a {@link com.apple.foundationdb.record.provider.foundationdb.FDBExceptions.FDBStoreTransactionConflictException})
* if a concurrent transaction modifies the record with the provided primary key. This call however does not require
* performing any reads against the database, so it is faster and cheaper to perform than a real read. Note also that
* read-only operations are not checked for conflicts, so if this method is called, but the transaction performs
* no mutations, the transaction will never be failed with the above exception. Note also that this does not
* check that a record with this primary key actually exists in the database.
*
* <p>
* One use case is that this can be used to promote a read from {@link IsolationLevel#SNAPSHOT} to
* {@link IsolationLevel#SERIALIZABLE}. For example, if one performs a query at {@link IsolationLevel#SNAPSHOT} and
* then uses a subset of the records to determine a few other writes, then one can add conflicts to <em>only</em>
* the records actually used.
* </p>
*
* <p>
* This method should be used with care and is advised only for those users who need extra control over conflict
* ranges.
* </p>
*
* @param primaryKey the primary key of the record to add a read conflict on
* @see com.apple.foundationdb.Transaction#addReadConflictRange(byte[], byte[])
*/
void addRecordReadConflict(@Nonnull Tuple primaryKey);
/**
* Add a write conflict as if one had modified the record with the given primary key. This will cause any concurrent
* transactions to fail (with a {@link com.apple.foundationdb.record.provider.foundationdb.FDBExceptions.FDBStoreTransactionConflictException})
* if they read the record with the provided primary key. This call however does not require performing any writes
* against the database, so it is faster and cheaper to perform than a real write. Note that this does not check
* if a record with this primary key actually exists in the database, and it does not update any indexes associated
* with the record. In this way, it is identical (in terms of conflicts) with overwriting the given record with itself,
* though it will not induce any disk I/O or cause any {@linkplain com.apple.foundationdb.Transaction#watch(byte[]) watches}
* on the modified keys to fire.
*
* <p>
* This method should be used with care and is advised only for those users who need extra control over conflict
* ranges.
* </p>
*
* @param primaryKey the primary key of the record to add a write conflict on
* @see com.apple.foundationdb.Transaction#addWriteConflictRange(byte[], byte[])
*/
void addRecordWriteConflict(@Nonnull Tuple primaryKey);
/**
* Scan the records in the database.
*
* @param continuation any continuation from a previous scan
* @param scanProperties skip, limit and other scan properties
*
* @return a cursor that will scan everything in the range, picking up at continuation, and honoring the given scan properties
*/
@Nonnull
default RecordCursor<FDBStoredRecord<M>> scanRecords(@Nullable byte[] continuation, @Nonnull ScanProperties scanProperties) {
return scanRecords(null, null, EndpointType.TREE_START, EndpointType.TREE_END, continuation, scanProperties);
}
/**
* Scan the records in the database in a range.
*
* @param range the range to scan
* @param continuation any continuation from a previous scan
* @param scanProperties skip, limit and other scan properties
*
* @return a cursor that will scan everything in the range, picking up at continuation, and honoring the given scan properties
*/
@Nonnull
default RecordCursor<FDBStoredRecord<M>> scanRecords(@Nonnull TupleRange range, @Nullable byte[] continuation, @Nonnull ScanProperties scanProperties) {
return scanRecords(range.getLow(), range.getHigh(), range.getLowEndpoint(), range.getHighEndpoint(), continuation, scanProperties);
}
/**
* Scan the records in the database in a range.
*
* @param low low point of scan range
* @param high high point of scan point
* @param lowEndpoint whether low point is inclusive or exclusive
* @param highEndpoint whether high point is inclusive or exclusive
* @param continuation any continuation from a previous scan
* @param scanProperties skip, limit and other scan properties
*
* @return a cursor that will scan everything in the range, picking up at continuation, and honoring the given scan properties
*/
@Nonnull
RecordCursor<FDBStoredRecord<M>> scanRecords(@Nullable Tuple low, @Nullable Tuple high,
@Nonnull EndpointType lowEndpoint, @Nonnull EndpointType highEndpoint,
@Nullable byte[] continuation,
@Nonnull ScanProperties scanProperties);
/**
* Count the number of records in the database in a range.
*
* @param low low point of scan range
* @param high high point of scan point
* @param lowEndpoint whether low point is inclusive or exclusive
* @param highEndpoint whether high point is inclusive or exclusive
*
* @return a future that will complete with the number of records in the range
*/
@Nonnull
default CompletableFuture<Integer> countRecords(@Nullable Tuple low, @Nullable Tuple high,
@Nonnull EndpointType lowEndpoint, @Nonnull EndpointType highEndpoint) {
return countRecords(low, high, lowEndpoint, highEndpoint, null, ScanProperties.FORWARD_SCAN);
}
/**
* Count the number of records in the database in a range.
*
* @param low low point of scan range
* @param high high point of scan point
* @param lowEndpoint whether low point is inclusive or exclusive
* @param highEndpoint whether high point is inclusive or exclusive
* @param continuation any continuation from a previous scan
* @param scanProperties skip, limit and other scan properties
*
* @return a future that will complete with the number of records in the range
*/
@Nonnull
CompletableFuture<Integer> countRecords(@Nullable Tuple low, @Nullable Tuple high,
@Nonnull EndpointType lowEndpoint, @Nonnull EndpointType highEndpoint,
@Nullable byte[] continuation,
@Nonnull ScanProperties scanProperties);
/**
* Scan the entries in an index.
* @param index the index to scan
* @param scanBounds the scan to preform
* @param continuation any continuation from a previous scan
* @param scanProperties skip, limit and other scan properties
* @return a cursor that will scan the index, picking up at continuation, and honoring the given scan properties
*/
@Nonnull
RecordCursor<IndexEntry> scanIndex(@Nonnull Index index, @Nonnull IndexScanBounds scanBounds, @Nullable byte[] continuation,
@Nonnull ScanProperties scanProperties);
/**
* Scan the entries in an index.
* @param index the index to scan
* @param scanType the type of scan to perform
* @param range range to scan
* @param continuation any continuation from a previous scan
* @param scanProperties skip, limit and other scan properties
* @return a cursor that will scan the index, picking up at continuation, and honoring the given scan properties
*/
@Nonnull
default RecordCursor<IndexEntry> scanIndex(@Nonnull Index index, @Nonnull IndexScanType scanType,
@Nonnull TupleRange range, @Nullable byte[] continuation,
@Nonnull ScanProperties scanProperties) {
return scanIndex(index, new IndexScanRange(scanType, range), continuation, scanProperties);
}
/**
* Scan the records pointed to by an index.
* @param indexName the name of the index
* @return a cursor that return records pointed to by the index
*/
@Nonnull
default RecordCursor<FDBIndexedRecord<M>> scanIndexRecords(@Nonnull final String indexName) {
return scanIndexRecords(indexName, IsolationLevel.SERIALIZABLE);
}
/**
* Scan the records pointed to by an index.
* @param indexName the name of the index
* @param isolationLevel the isolation level to use when reading
* @return a cursor that return records pointed to by the index
*/
@Nonnull
default RecordCursor<FDBIndexedRecord<M>> scanIndexRecords(@Nonnull final String indexName, IsolationLevel isolationLevel) {
return scanIndexRecords(indexName, IndexScanType.BY_VALUE, TupleRange.ALL, null,
new ScanProperties(ExecuteProperties.newBuilder().setIsolationLevel(isolationLevel).build()));
}
/**
* Scan the records pointed to by an index.
* @param indexName the name of the index
* @param scanType the type of scan to perform
* @param range the range of the index to scan
* @param continuation any continuation from a previous scan
* @param scanProperties skip, limit and other scan properties
* @return a cursor that return records pointed to by the index
*/
@Nonnull
default RecordCursor<FDBIndexedRecord<M>> scanIndexRecords(@Nonnull final String indexName,
@Nonnull final IndexScanType scanType,
@Nonnull final TupleRange range,
@Nullable byte[] continuation,
@Nonnull ScanProperties scanProperties) {
return scanIndexRecords(indexName, scanType, range, continuation, IndexOrphanBehavior.ERROR, scanProperties);
}
/**
* Scan the records pointed to by an index.
* @param indexName the name of the index
* @param scanType the type of scan to perform
* @param range the range of the index to scan
* @param continuation any continuation from a previous scan
* @param orphanBehavior how the iteration process should respond in the face of entries in the index for which
* there is no associated record
* @param scanProperties skip, limit and other scan properties
* @return a cursor that return records pointed to by the index
*/
@Nonnull
default RecordCursor<FDBIndexedRecord<M>> scanIndexRecords(@Nonnull final String indexName,
@Nonnull final IndexScanType scanType,
@Nonnull final TupleRange range,
@Nullable byte[] continuation,
@Nonnull IndexOrphanBehavior orphanBehavior,
@Nonnull ScanProperties scanProperties) {
final Index index = getRecordMetaData().getIndex(indexName);
return fetchIndexRecords(scanIndex(index, scanType, range, continuation, scanProperties), orphanBehavior,
scanProperties.getExecuteProperties().getState());
}
/**
* Scan the records pointed to by an index, using a single scan-and-dereference FDB operation.
* @param indexName the name of the index
* @param scanBounds the range of the index to scan
* @param commonPrimaryKey the common primary key for the records that would be returned
* @param continuation any continuation from a previous scan
* @param scanProperties skip, limit and other scan properties
* @return a cursor that return records pointed to by the index
*/
@Nonnull
@API(API.Status.EXPERIMENTAL)
default RecordCursor<FDBIndexedRecord<M>> scanIndexRemoteFetch(@Nonnull final String indexName,
@Nonnull final IndexScanBounds scanBounds,
@Nonnull final KeyExpression commonPrimaryKey,
@Nullable byte[] continuation,
@Nonnull ScanProperties scanProperties,
@Nonnull final IndexOrphanBehavior orphanBehavior) {
final Index index = getRecordMetaData().getIndex(indexName);
return scanIndexRemoteFetch(index, scanBounds, commonPrimaryKey, continuation, scanProperties, orphanBehavior);
}
/**
* Scan the records pointed to by an index, using a single scan-and-dereference FDB operation.
* @param index the index to scan
* @param scanBounds the range for the index to scan
* @param commonPrimaryKey the common primary key for the records that would be returned
* @param continuation any continuation from a previous scan
* @param scanProperties skip, limit and other scan properties
* @return a cursor that return records pointed to by the index
*/
@Nonnull
@API(API.Status.EXPERIMENTAL)
RecordCursor<FDBIndexedRecord<M>> scanIndexRemoteFetch(@Nonnull Index index,
@Nonnull IndexScanBounds scanBounds,
@Nonnull KeyExpression commonPrimaryKey,
@Nullable byte[] continuation,
@Nonnull ScanProperties scanProperties,
@Nonnull IndexOrphanBehavior orphanBehavior);
/**
* Given a cursor that iterates over entries in an index, attempts to fetch the associated records for those entries.
*
* @param indexCursor a cursor iterating over entries in the index
* @param orphanBehavior how the iteration process should respond in the face of entries in the index for which
* there is no associated record
* @return a cursor returning indexed record entries
*/
@Nonnull
default RecordCursor<FDBIndexedRecord<M>> fetchIndexRecords(@Nonnull RecordCursor<IndexEntry> indexCursor,
@Nonnull IndexOrphanBehavior orphanBehavior) {
return fetchIndexRecords(indexCursor, orphanBehavior, ExecuteState.NO_LIMITS);
}
/**