Skip to content

Commit

Permalink
Merge #78184 #78195
Browse files Browse the repository at this point in the history
78184: workload/schemachanger: primary index detection is incorrect r=fqazi a=fqazi

Fixes: #77885

Previously, the schemachanger workload incorrectly detected primary
indexes using their names checking for either the words
'pkey' or 'primary'. This approach was not correct and heuristic
in nature. To address this, this patch uses crdb_internal.table_indexes
to detect primary indexes.

Release note: None

78195: distsql: clean up the determination of txn type in mixed version r=yuzefovich a=yuzefovich

This commit cleans up the way we determine what txn type to use for
a particular flow.

Addresses: #78150.

Release note: None

Co-authored-by: Faizan Qazi <faizan@cockroachlabs.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
3 people committed Mar 22, 2022
3 parents 54671e7 + 5b3be20 + 65e6615 commit 420c418
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 35 deletions.
1 change: 0 additions & 1 deletion pkg/sql/distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ go_library(
"//pkg/sql/execinfrapb",
"//pkg/sql/faketreeeval",
"//pkg/sql/flowinfra",
"//pkg/sql/row",
"//pkg/sql/rowflow",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
Expand Down
22 changes: 4 additions & 18 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/faketreeeval"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/rowflow"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
Expand Down Expand Up @@ -407,24 +406,11 @@ func (ds *ServerImpl) setupFlow(
}

// Figure out what txn the flow needs to run in, if any. For gateway flows
// that have no remote flows and also no concurrency, the txn comes from
// localState.Txn. Otherwise, we create a txn based on the request's
// LeafTxnInputState.
useLeaf := false
if req.LeafTxnInputState != nil && row.CanUseStreamer(ctx, ds.Settings) {
for _, proc := range req.Flow.Processors {
if jr := proc.Core.JoinReader; jr != nil {
if jr.IsIndexJoin() {
// Index joins are executed via the Streamer API that has
// concurrency.
useLeaf = true
break
}
}
}
}
// that have no remote flows and also no concurrency, the (root) txn comes
// from localState.Txn if we haven't already created a leaf txn. Otherwise,
// we create, if necessary, a txn based on the request's LeafTxnInputState.
var txn *kv.Txn
if localState.IsLocal && !f.ConcurrentTxnUse() && !useLeaf {
if localState.IsLocal && !f.ConcurrentTxnUse() && leafTxn == nil {
txn = localState.Txn
} else {
// If I haven't created the leaf already, do it now.
Expand Down
41 changes: 25 additions & 16 deletions pkg/workload/schemachange/error_screening.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,22 @@ func (og *operationGenerator) colIsPrimaryKey(
ctx context.Context, tx pgx.Tx, tableName *tree.TableName, columnName string,
) (bool, error) {
primaryColumns, err := og.scanStringArray(ctx, tx,
`SELECT array_agg(column_name)
FROM (
SELECT DISTINCT column_name
FROM information_schema.statistics
WHERE (index_name = 'primary' OR index_name LIKE '%pkey%')
AND table_schema = $1
AND table_name = $2
AND storing = 'NO'
);
`, tableName.Schema(), tableName.Object())
`
SELECT array_agg(column_name)
FROM (
SELECT DISTINCT column_name
FROM information_schema.statistics
WHERE index_name
IN (
SELECT index_name
FROM crdb_internal.table_indexes
WHERE index_type = 'primary' AND descriptor_id = $3::REGCLASS
)
AND table_schema = $1
AND table_name = $2
AND storing = 'NO'
);
`, tableName.Schema(), tableName.Object(), tableName.String())
if err != nil {
return false, err
}
Expand Down Expand Up @@ -282,15 +288,18 @@ func (og *operationGenerator) tableHasPrimaryKeySwapActive(
indexName, err := og.scanStringArray(
ctx,
tx,
fmt.Sprintf(`
`
SELECT array_agg(index_name)
FROM (
SELECT index_name
FROM [SHOW INDEXES FROM %s]
WHERE index_name LIKE '%%_pkey%%'
LIMIT 1
SELECT
index_name
FROM
crdb_internal.table_indexes
WHERE
index_type = 'primary'
AND descriptor_id = $1::REGCLASS
);
`, tableName.String()),
`, tableName.String(),
)
if err != nil {
return err
Expand Down

0 comments on commit 420c418

Please sign in to comment.