diff --git a/Makefile b/Makefile index d9be018f969d..069a66d050ad 100644 --- a/Makefile +++ b/Makefile @@ -997,13 +997,11 @@ $(go-targets): override LINKFLAGS += \ $(COCKROACH) $(COCKROACHOSS) go-install: override LINKFLAGS += \ -X "github.com/cockroachdb/cockroach/pkg/build.utcTime=$(shell date -u '+%Y/%m/%d %H:%M:%S')" -settings-doc-gen = $(if $(filter buildshort,$(MAKECMDGOALS)),$(COCKROACHSHORT),$(COCKROACH)) +docs/generated/settings/settings.html: $(COCKROACHSHORT) + @$(COCKROACHSHORT) gen settings-list --format=rawhtml > $@ -docs/generated/settings/settings.html: $(settings-doc-gen) - @$(settings-doc-gen) gen settings-list --format=rawhtml > $@ - -docs/generated/settings/settings-for-tenants.txt: $(settings-doc-gen) - @$(settings-doc-gen) gen settings-list --without-system-only > $@ +docs/generated/settings/settings-for-tenants.txt: $(COCKROACHSHORT) + @$(COCKROACHSHORT) gen settings-list --without-system-only > $@ SETTINGS_DOC_PAGES := docs/generated/settings/settings.html docs/generated/settings/settings-for-tenants.txt diff --git a/docs/generated/sql/functions.md b/docs/generated/sql/functions.md index 17664eac7b70..5cbedb9784a6 100644 --- a/docs/generated/sql/functions.md +++ b/docs/generated/sql/functions.md @@ -3018,7 +3018,7 @@ may increase either contention or retry errors, or both.

Immutable crdb_internal.assignment_cast(val: anyelement, type: anyelement) → anyelement

This function is used internally to perform assignment casts during mutations.

Stable -crdb_internal.check_consistency(stats_only: bool, start_key: bytes, end_key: bytes) → tuple{int AS range_id, bytes AS start_key, string AS start_key_pretty, string AS status, string AS detail}

Runs a consistency check on ranges touching the specified key range. an empty start or end key is treated as the minimum and maximum possible, respectively. stats_only should only be set to false when targeting a small number of ranges to avoid overloading the cluster. Each returned row contains the range ID, the status (a roachpb.CheckConsistencyResponse_Status), and verbose detail.

+crdb_internal.check_consistency(stats_only: bool, start_key: bytes, end_key: bytes) → tuple{int AS range_id, bytes AS start_key, string AS start_key_pretty, string AS status, string AS detail, interval AS duration}

Runs a consistency check on ranges touching the specified key range. an empty start or end key is treated as the minimum and maximum possible, respectively. stats_only should only be set to false when targeting a small number of ranges to avoid overloading the cluster. Each returned row contains the range ID, the status (a roachpb.CheckConsistencyResponse_Status), and verbose detail.

Example usage: SELECT * FROM crdb_internal.check_consistency(true, ‘\x02’, ‘\x04’)

Volatile diff --git a/pkg/ccl/testccl/workload/schemachange/BUILD.bazel b/pkg/ccl/testccl/workload/schemachange/BUILD.bazel index 9a9e19162541..b5e6e0246d16 100644 --- a/pkg/ccl/testccl/workload/schemachange/BUILD.bazel +++ b/pkg/ccl/testccl/workload/schemachange/BUILD.bazel @@ -20,6 +20,7 @@ go_test( "//pkg/security/securitytest", "//pkg/server", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/leaktest", diff --git a/pkg/ccl/testccl/workload/schemachange/schema_change_external_test.go b/pkg/ccl/testccl/workload/schemachange/schema_change_external_test.go index 1671b3cd3129..9c8d1676cc2e 100644 --- a/pkg/ccl/testccl/workload/schemachange/schema_change_external_test.go +++ b/pkg/ccl/testccl/workload/schemachange/schema_change_external_test.go @@ -21,6 +21,7 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/ccl" "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl/multiregionccltestutils" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/workload" @@ -33,6 +34,7 @@ import ( func TestWorkload(t *testing.T) { defer leaktest.AfterTest(t)() defer utilccl.TestingEnableEnterprise()() + skip.WithIssue(t, 78478) dir := t.TempDir() ctx := context.Background() diff --git a/pkg/cloud/gcp/gcs_storage.go b/pkg/cloud/gcp/gcs_storage.go index b9d9c263a9fa..62ba988508c7 100644 --- a/pkg/cloud/gcp/gcs_storage.go +++ b/pkg/cloud/gcp/gcs_storage.go @@ -66,6 +66,15 @@ var gcsChunkingEnabled = settings.RegisterBoolSetting( true, /* default */ ) +// gcsChunkRetryTimeout is used to configure the per-chunk retry deadline when +// uploading chunks to Google Cloud Storage. +var gcsChunkRetryTimeout = settings.RegisterDurationSetting( + settings.TenantWritable, + "cloudstorage.gs.chunking.retry_timeout", + "per-chunk retry deadline when chunking of file upload to Google Cloud Storage", + 60, +) + func parseGSURL(_ cloud.ExternalStorageURIContext, uri *url.URL) (cloudpb.ExternalStorage, error) { gsURL := cloud.ConsumeURL{URL: uri} conf := cloudpb.ExternalStorage{} @@ -253,6 +262,7 @@ func (g *gcsStorage) Writer(ctx context.Context, basename string) (io.WriteClose if !gcsChunkingEnabled.Get(&g.settings.SV) { w.ChunkSize = 0 } + w.ChunkRetryDeadline = gcsChunkRetryTimeout.Get(&g.settings.SV) return w, nil } diff --git a/pkg/sql/gcjob/BUILD.bazel b/pkg/sql/gcjob/BUILD.bazel index fc05b14b936e..431d2b43177b 100644 --- a/pkg/sql/gcjob/BUILD.bazel +++ b/pkg/sql/gcjob/BUILD.bazel @@ -38,6 +38,7 @@ go_library( "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/sem/tree", + "//pkg/sql/sqlerrors", "//pkg/util/admission/admissionpb", "//pkg/util/hlc", "//pkg/util/log", diff --git a/pkg/sql/gcjob/index_garbage_collection.go b/pkg/sql/gcjob/index_garbage_collection.go index 8e63f08d11cd..22b3261543aa 100644 --- a/pkg/sql/gcjob/index_garbage_collection.go +++ b/pkg/sql/gcjob/index_garbage_collection.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -44,6 +45,10 @@ func deleteIndexData( // are no longer in use. This is necessary in the case of truncate, where we // schedule a GC Job in the transaction that commits the truncation. parentDesc, err := sql.WaitToUpdateLeases(ctx, execCfg.LeaseManager, parentID) + if errors.Is(err, catalog.ErrDescriptorNotFound) { + handleTableDescriptorDeleted(ctx, parentID, progress) + return nil + } if err != nil { return err } @@ -89,6 +94,10 @@ func gcIndexes( // are no longer in use. This is necessary in the case of truncate, where we // schedule a GC Job in the transaction that commits the truncation. parentDesc, err := sql.WaitToUpdateLeases(ctx, execCfg.LeaseManager, parentID) + if errors.Is(err, catalog.ErrDescriptorNotFound) { + handleTableDescriptorDeleted(ctx, parentID, progress) + return nil + } if err != nil { return err } @@ -129,10 +138,15 @@ func gcIndexes( ctx, txn, execCfg, descriptors, freshParentTableDesc, []uint32{uint32(index.IndexID)}, ) } - if err := sql.DescsTxn(ctx, execCfg, removeIndexZoneConfigs); err != nil { + err := sql.DescsTxn(ctx, execCfg, removeIndexZoneConfigs) + if errors.Is(err, catalog.ErrDescriptorNotFound) || + sqlerrors.IsUndefinedRelationError(err) { + handleTableDescriptorDeleted(ctx, parentID, progress) + return nil + } + if err != nil { return errors.Wrapf(err, "removing index %d zone configs", index.IndexID) } - markIndexGCed( ctx, index.IndexID, progress, jobspb.SchemaChangeGCProgress_CLEARED, ) @@ -214,7 +228,8 @@ func deleteIndexZoneConfigsAfterGC( } err := sql.DescsTxn(ctx, execCfg, removeIndexZoneConfigs) switch { - case errors.Is(err, catalog.ErrDescriptorNotFound): + case errors.Is(err, catalog.ErrDescriptorNotFound), + sqlerrors.IsUndefinedRelationError(err): log.Infof(ctx, "removing index %d zone config from table %d failed: %v", index.IndexID, parentID, err) case err != nil: @@ -226,3 +241,20 @@ func deleteIndexZoneConfigsAfterGC( } return nil } + +// handleTableDescriptorDeleted should be called when logic detects that +// a table descriptor has been deleted while attempting to GC an index. +// The function marks in progress that all indexes have been cleared. +func handleTableDescriptorDeleted( + ctx context.Context, parentID descpb.ID, progress *jobspb.SchemaChangeGCProgress, +) { + droppedIndexes := progress.Indexes + // If the descriptor has been removed, then we need to assume that the relevant + // zone configs and data have been cleaned up by another process. + log.Infof(ctx, "descriptor %d dropped, assuming another process has handled GC", parentID) + for _, index := range droppedIndexes { + markIndexGCed( + ctx, index.IndexID, progress, jobspb.SchemaChangeGCProgress_CLEARED, + ) + } +} diff --git a/pkg/sql/gcjob_test/BUILD.bazel b/pkg/sql/gcjob_test/BUILD.bazel index 8ceb0892c437..4d0b4a1db8ce 100644 --- a/pkg/sql/gcjob_test/BUILD.bazel +++ b/pkg/sql/gcjob_test/BUILD.bazel @@ -33,6 +33,8 @@ go_test( "//pkg/sql/catalog/tabledesc", "//pkg/sql/gcjob", "//pkg/sql/gcjob/gcjobnotifier", + "//pkg/sql/sem/catid", + "//pkg/sql/sqlutil", "//pkg/testutils", "//pkg/testutils/jobutils", "//pkg/testutils/serverutils", diff --git a/pkg/sql/gcjob_test/gc_job_test.go b/pkg/sql/gcjob_test/gc_job_test.go index d26bdbc932d0..709e61d5726f 100644 --- a/pkg/sql/gcjob_test/gc_job_test.go +++ b/pkg/sql/gcjob_test/gc_job_test.go @@ -39,6 +39,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/gcjob" "github.com/cockroachdb/cockroach/pkg/sql/gcjob/gcjobnotifier" + "github.com/cockroachdb/cockroach/pkg/sql/sem/catid" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -506,6 +508,122 @@ func TestGCTenant(t *testing.T) { }) } +// This test exercises code whereby an index GC job is running, and, in the +// meantime, the descriptor is removed. We want to ensure that the GC job +// finishes without an error. +func TestDropIndexWithDroppedDescriptor(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // The way the GC job works is that it initially clears the index + // data, then it waits for the background MVCC GC to run and remove + // the underlying tombstone, and then finally it removes any relevant + // zone configurations for the index from system.zones. In the first + // and final phases, the job resolves the descriptor. This test ensures + // that the code is robust to the descriptor being removed both before + // the initial DelRange, and after, when going to remove the zone config. + testutils.RunTrueAndFalse(t, "before DelRange", func( + t *testing.T, beforeDelRange bool, + ) { + ctx, cancel := context.WithCancel(context.Background()) + gcJobID := make(chan jobspb.JobID) + knobs := base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + GCJob: &sql.GCJobTestingKnobs{ + RunBeforeResume: func(jobID jobspb.JobID) error { + select { + case <-ctx.Done(): + return ctx.Err() + case gcJobID <- jobID: + return nil + } + }, + SkipWaitingForMVCCGC: true, + }, + } + delRangeChan := make(chan chan struct{}) + var tablePrefix atomic.Value + tablePrefix.Store(roachpb.Key{}) + // If not running beforeDelRange, we want to delete the descriptor during + // the DeleteRange operation. To do this, we install the below testing knob. + if !beforeDelRange { + knobs.Store = &kvserver.StoreTestingKnobs{ + TestingRequestFilter: func( + ctx context.Context, request roachpb.BatchRequest, + ) *roachpb.Error { + req, ok := request.GetArg(roachpb.DeleteRange) + if !ok { + return nil + } + dr := req.(*roachpb.DeleteRangeRequest) + if !dr.UseRangeTombstone { + return nil + } + k := tablePrefix.Load().(roachpb.Key) + if len(k) == 0 { + return nil + } + ch := make(chan struct{}) + select { + case delRangeChan <- ch: + case <-ctx.Done(): + } + select { + case <-ch: + case <-ctx.Done(): + } + return nil + }, + } + } + s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: knobs, + }) + defer s.Stopper().Stop(ctx) + defer cancel() + tdb := sqlutils.MakeSQLRunner(sqlDB) + + // Create the table and index to be dropped. + tdb.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY, j INT, INDEX(j, i))") + // Store the relevant IDs to make it easy to intercept the DelRange. + var tableID catid.DescID + var indexID catid.IndexID + tdb.QueryRow(t, ` +SELECT descriptor_id, index_id + FROM crdb_internal.table_indexes + WHERE descriptor_name = 'foo' + AND index_name = 'foo_j_i_idx';`).Scan(&tableID, &indexID) + // Drop the index. + tdb.Exec(t, "DROP INDEX foo@foo_j_i_idx") + codec := s.ExecutorConfig().(sql.ExecutorConfig).Codec + tablePrefix.Store(codec.TablePrefix(uint32(tableID))) + + deleteDescriptor := func(t *testing.T) { + t.Helper() + k := catalogkeys.MakeDescMetadataKey(codec, tableID) + _, err := kvDB.Del(ctx, k) + require.NoError(t, err) + } + + // Delete the descriptor either before the initial job run, or after + // the job has started, but during the sending of DeleteRange requests. + var jobID jobspb.JobID + if beforeDelRange { + deleteDescriptor(t) + jobID = <-gcJobID + } else { + jobID = <-gcJobID + ch := <-delRangeChan + deleteDescriptor(t) + close(ch) + } + // Ensure that the job completes successfully in either case. + require.NoError(t, s.JobRegistry().(*jobs.Registry).WaitForJobs( + ctx, s.InternalExecutor().(sqlutil.InternalExecutor), []jobspb.JobID{jobID}, + )) + }) +} + // TestGCJobNoSystemConfig tests that the GC job is robust to running with // no system config provided by the SystemConfigProvider. It is a regression // test for a panic which could occur due to a slow systemconfigwatcher diff --git a/pkg/sql/logictest/testdata/logic_test/builtin_function_notenant b/pkg/sql/logictest/testdata/logic_test/builtin_function_notenant index 696bab428d46..f6c01d180ddb 100644 --- a/pkg/sql/logictest/testdata/logic_test/builtin_function_notenant +++ b/pkg/sql/logictest/testdata/logic_test/builtin_function_notenant @@ -4,20 +4,20 @@ subtest check_consistency # Sanity-check crdb_internal.check_consistency. -statement error start key must be >= "\\x02" -SELECT crdb_internal.check_consistency(true, '\x01', '\xffff') +statement error start key must be > "\\x02" +SELECT crdb_internal.check_consistency(true, '\x02', '\xffff') statement error end key must be < "\\xff\\xff" -SELECT crdb_internal.check_consistency(true, '\x02', '\xffff00') +SELECT crdb_internal.check_consistency(true, '\x0200', '\xffff00') statement error start key must be less than end key -SELECT crdb_internal.check_consistency(true, '\x02', '\x02') +SELECT crdb_internal.check_consistency(true, '\x03', '\x03') statement error start key must be less than end key -SELECT crdb_internal.check_consistency(true, '\x03', '\x02') +SELECT crdb_internal.check_consistency(true, '\x04', '\x03') query ITT -SELECT range_id, status, regexp_replace(detail, '[0-9]+', '', 'g') FROM crdb_internal.check_consistency(true, '\x02', '\xffff') WHERE range_id = 1 +SELECT range_id, status, regexp_replace(detail, '[0-9]+', '', 'g') FROM crdb_internal.check_consistency(true, '\x03', '\xffff') WHERE range_id = 1 ---- 1 RANGE_CONSISTENT stats: {ContainsEstimates: LastUpdateNanos: IntentAge: GCBytesAge: LiveBytes: LiveCount: KeyBytes: KeyCount: ValBytes: ValCount: IntentBytes: IntentCount: SeparatedIntentCount: RangeKeyCount: RangeKeyBytes: RangeValCount: RangeValBytes: SysBytes: SysCount: AbortSpanBytes:} @@ -32,14 +32,18 @@ SELECT count(*) > 5 FROM crdb_internal.check_consistency(true, '', '') true # Query that should touch only a single range. +# +# NB: the use of ScanMetaKVs causes issues here. Bounds [`k`, k.Next()]` don't work, +# with errors such as (here k=\xff): +# pq: failed to verify keys for Scan: end key /Meta2/"\xff\x00" must be greater than start /Meta2/"\xff\x00" query B -SELECT count(*) = 1 FROM crdb_internal.check_consistency(true, '\x03', '\x0300') +SELECT count(*) = 1 FROM crdb_internal.check_consistency(true, '\xff', '\xffff') ---- true -# Ditto, but implicit start key \x02 +# Ditto, but implicit start key \x03 query B -SELECT count(*) = 1 FROM crdb_internal.check_consistency(true, '', '\x0200') +SELECT count(*) = 1 FROM crdb_internal.check_consistency(true, '', '\x04') ---- true diff --git a/pkg/sql/sem/builtins/fixed_oids.go b/pkg/sql/sem/builtins/fixed_oids.go index 84c7a2bac85d..b07f58e27771 100644 --- a/pkg/sql/sem/builtins/fixed_oids.go +++ b/pkg/sql/sem/builtins/fixed_oids.go @@ -370,7 +370,7 @@ var builtinOidsBySignature = map[string]oid.Oid{ `crdb_internal.active_version() -> jsonb`: 1296, `crdb_internal.approximate_timestamp(timestamp: decimal) -> timestamp`: 1298, `crdb_internal.assignment_cast(val: anyelement, type: anyelement) -> anyelement`: 1341, - `crdb_internal.check_consistency(stats_only: bool, start_key: bytes, end_key: bytes) -> tuple{int AS range_id, bytes AS start_key, string AS start_key_pretty, string AS status, string AS detail}`: 347, + `crdb_internal.check_consistency(stats_only: bool, start_key: bytes, end_key: bytes) -> tuple{int AS range_id, bytes AS start_key, string AS start_key_pretty, string AS status, string AS detail, interval AS duration}`: 347, `crdb_internal.check_password_hash_format(password: bytes) -> string`: 1376, `crdb_internal.cluster_id() -> uuid`: 1299, `crdb_internal.cluster_name() -> string`: 1301, diff --git a/pkg/sql/sem/builtins/generator_builtins.go b/pkg/sql/sem/builtins/generator_builtins.go index 6cb368df72f0..7dcdaf18300f 100644 --- a/pkg/sql/sem/builtins/generator_builtins.go +++ b/pkg/sql/sem/builtins/generator_builtins.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/errorutil" "github.com/cockroachdb/cockroach/pkg/util/json" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" @@ -1827,13 +1828,26 @@ func (j *jsonRecordSetGenerator) Next(ctx context.Context) (bool, error) { } type checkConsistencyGenerator struct { + txn *kv.Txn // to load range descriptors consistencyChecker eval.ConsistencyCheckRunner from, to roachpb.Key mode roachpb.ChecksumMode - // remainingRows is populated by Start(). Each Next() call peels of the first - // row and moves it to curRow. - remainingRows []roachpb.CheckConsistencyResponse_Result - curRow roachpb.CheckConsistencyResponse_Result + + // The descriptors for which we haven't yet emitted rows. Rows are consumed + // from this field and produce one (or more, in the case of splits not reflected + // in the descriptor) rows in `next`. + descs []roachpb.RangeDescriptor + // The current row, emitted by Values(). + cur roachpb.CheckConsistencyResponse_Result + // The time it took to produce the current row, i.e. how long it took to run + // the consistency check that produced the row. When a consistency check + // produces more than one row (i.e. after a split), all of the duration will + // be attributed to the first row. + dur time.Duration + // next are the potentially prefetched subsequent rows. This is usually empty + // (as one consistency check produces one result which immediately moves to + // `cur`) except when a descriptor we use doesn't reflect subsequent splits. + next []roachpb.CheckConsistencyResponse_Result } var _ eval.ValueGenerator = &checkConsistencyGenerator{} @@ -1850,14 +1864,18 @@ func makeCheckConsistencyGenerator( keyTo := roachpb.Key(*args[2].(*tree.DBytes)) if len(keyFrom) == 0 { - keyFrom = keys.LocalMax + // NB: you'd expect LocalMax here but when we go and call ScanMetaKVs, it + // would interpret LocalMax as Meta1Prefix and translate that to KeyMin, + // then fail on the scan. That method should really handle this better + // but also we should use IterateRangeDescriptors instead. + keyFrom = keys.Meta2Prefix } if len(keyTo) == 0 { keyTo = roachpb.KeyMax } - if bytes.Compare(keyFrom, keys.LocalMax) < 0 { - return nil, errors.Errorf("start key must be >= %q", []byte(keys.LocalMax)) + if bytes.Compare(keyFrom, keys.LocalMax) <= 0 { + return nil, errors.Errorf("start key must be > %q", []byte(keys.LocalMax)) } if bytes.Compare(keyTo, roachpb.KeyMax) > 0 { return nil, errors.Errorf("end key must be < %q", []byte(roachpb.KeyMax)) @@ -1872,6 +1890,7 @@ func makeCheckConsistencyGenerator( } return &checkConsistencyGenerator{ + txn: ctx.Txn, consistencyChecker: ctx.ConsistencyChecker, from: keyFrom, to: keyTo, @@ -1880,8 +1899,8 @@ func makeCheckConsistencyGenerator( } var checkConsistencyGeneratorType = types.MakeLabeledTuple( - []*types.T{types.Int, types.Bytes, types.String, types.String, types.String}, - []string{"range_id", "start_key", "start_key_pretty", "status", "detail"}, + []*types.T{types.Int, types.Bytes, types.String, types.String, types.String, types.Interval}, + []string{"range_id", "start_key", "start_key_pretty", "status", "detail", "duration"}, ) // ResolvedType is part of the tree.ValueGenerator interface. @@ -1891,32 +1910,94 @@ func (*checkConsistencyGenerator) ResolvedType() *types.T { // Start is part of the tree.ValueGenerator interface. func (c *checkConsistencyGenerator) Start(ctx context.Context, _ *kv.Txn) error { - resp, err := c.consistencyChecker.CheckConsistency(ctx, c.from, c.to, c.mode) + span := roachpb.Span{Key: c.from, EndKey: c.to} + // NB: should use IterateRangeDescriptors here which is in the 'upgrade' + // package to avoid pulling all into memory. That needs a refactor, though. + // kvprober also has some code to iterate in batches. + descs, err := kvclient.ScanMetaKVs(ctx, c.txn, span) if err != nil { return err } - c.remainingRows = resp.Result + for _, v := range descs { + var desc roachpb.RangeDescriptor + if err := v.ValueProto(&desc); err != nil { + return err + } + if len(desc.StartKey) == 0 { + desc.StartKey = keys.MustAddr(keys.LocalMax) + // Elide potential second copy we might be getting for r1 + // if meta1 and meta2 haven't split. + // This too should no longer be necessary with IterateRangeDescriptors. + if len(c.descs) == 1 { + continue + } + } + c.descs = append(c.descs, desc) + } return nil } +// maybeRefillRows checks whether c.next is empty and if so, consumes the first +// element of c.descs for a consistency check. This populates c.next with at +// least one result (even on error). Returns the duration of the consistency +// check, if any, and zero otherwise. +func (c *checkConsistencyGenerator) maybeRefillRows(ctx context.Context) time.Duration { + if len(c.next) > 0 || len(c.descs) == 0 { + // We have a row to produce or no more ranges to check, so we're done + // for now or for good, respectively. + return 0 + } + tBegin := timeutil.Now() + // NB: peeling off the spans one by one allows this generator to produce + // rows in a streaming manner. If we called CheckConsistency(c.from, c.to) + // we would only get the result once all checks have completed and it will + // generally be a lot more brittle since an error will completely wipe out + // the result set. + desc := c.descs[0] + c.descs = c.descs[1:] + resp, err := c.consistencyChecker.CheckConsistency( + ctx, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey(), c.mode, + ) + if err != nil { + resp = &roachpb.CheckConsistencyResponse{Result: []roachpb.CheckConsistencyResponse_Result{{ + RangeID: desc.RangeID, + StartKey: desc.StartKey, + Status: roachpb.CheckConsistencyResponse_RANGE_INDETERMINATE, + Detail: err.Error(), + }}} + } + + // NB: this could have more than one entry, if a range split in the + // meantime. + c.next = resp.Result + return timeutil.Since(tBegin) +} + // Next is part of the tree.ValueGenerator interface. -func (c *checkConsistencyGenerator) Next(_ context.Context) (bool, error) { - if len(c.remainingRows) == 0 { +func (c *checkConsistencyGenerator) Next(ctx context.Context) (bool, error) { + dur := c.maybeRefillRows(ctx) + if len(c.next) == 0 { return false, nil } - c.curRow = c.remainingRows[0] - c.remainingRows = c.remainingRows[1:] + c.dur, c.cur, c.next = dur, c.next[0], c.next[1:] return true, nil } // Values is part of the tree.ValueGenerator interface. func (c *checkConsistencyGenerator) Values() (tree.Datums, error) { + row := c.cur + intervalMeta := types.IntervalTypeMetadata{ + DurationField: types.IntervalDurationField{ + DurationType: types.IntervalDurationType_MILLISECOND, + }, + } return tree.Datums{ - tree.NewDInt(tree.DInt(c.curRow.RangeID)), - tree.NewDBytes(tree.DBytes(c.curRow.StartKey)), - tree.NewDString(roachpb.Key(c.curRow.StartKey).String()), - tree.NewDString(c.curRow.Status.String()), - tree.NewDString(c.curRow.Detail), + tree.NewDInt(tree.DInt(row.RangeID)), + tree.NewDBytes(tree.DBytes(row.StartKey)), + tree.NewDString(roachpb.Key(row.StartKey).String()), + tree.NewDString(row.Status.String()), + tree.NewDString(row.Detail), + tree.NewDInterval(duration.MakeDuration(c.dur.Nanoseconds(), 0 /* days */, 0 /* months */), intervalMeta), }, nil }