Skip to content

Commit

Permalink
sql/schemachanger: plumb context, check for cancelation sometimes
Browse files Browse the repository at this point in the history
Fixes #87246

This will also improve tracing.

Release note: None
  • Loading branch information
ajwerner committed Sep 23, 2022
1 parent 14f11eb commit 8778f63
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 35 deletions.
2 changes: 1 addition & 1 deletion pkg/cli/declarative_corpus.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ a given corpus file.
return jobID
},
}
_, err := scplan.MakePlan(*state, params)
_, err := scplan.MakePlan(cmd.Context(), *state, params)
if err != nil {
fmt.Printf("failed to validate %s with error %v\n", name, err)
} else {
Expand Down
8 changes: 5 additions & 3 deletions pkg/sql/explain_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,17 @@ func (n *explainDDLNode) startExec(params runParams) error {
return explainNotPossibleError
}
}
return n.setExplainValues(scNode.plannedState)
return n.setExplainValues(params.ctx, scNode.plannedState)
}

func (n *explainDDLNode) setExplainValues(scState scpb.CurrentState) (err error) {
func (n *explainDDLNode) setExplainValues(
ctx context.Context, scState scpb.CurrentState,
) (err error) {
defer func() {
err = errors.WithAssertionFailure(err)
}()
var p scplan.Plan
p, err = scplan.MakePlan(scState, scplan.Params{
p, err = scplan.MakePlan(ctx, scState, scplan.Params{
ExecutionPhase: scop.StatementPhase,
SchemaChangerJobIDSupplier: func() jobspb.JobID { return 1 },
})
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/schemachanger/corpus/corpus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package corpus_test

import (
"context"
"flag"
"testing"

Expand Down Expand Up @@ -40,7 +41,7 @@ func TestValidateCorpuses(t *testing.T) {
jobID := jobspb.InvalidJobID
name, state := reader.GetCorpus(corpusIdx)
t.Run(name, func(t *testing.T) {
_, err := scplan.MakePlan(*state, scplan.Params{
_, err := scplan.MakePlan(context.Background(), *state, scplan.Params{
ExecutionPhase: scop.LatestPhase,
InRollback: state.InRollback,
SchemaChangerJobIDSupplier: func() jobspb.JobID {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/schemachanger/scdeps/sctestutils/sctestutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func ProtoDiff(a, b protoutil.Message, args DiffArgs, rewrites func(interface{})

// MakePlan is a convenient alternative to calling scplan.MakePlan in tests.
func MakePlan(t *testing.T, state scpb.CurrentState, phase scop.Phase) scplan.Plan {
plan, err := scplan.MakePlan(state, scplan.Params{
plan, err := scplan.MakePlan(context.Background(), state, scplan.Params{
ExecutionPhase: phase,
SchemaChangerJobIDSupplier: func() jobspb.JobID { return 1 },
})
Expand Down
12 changes: 7 additions & 5 deletions pkg/sql/schemachanger/scplan/internal/opgen/op_gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,19 @@ func IterateTransitions(

// BuildGraph constructs a graph with operation edges populated from an initial
// state.
func BuildGraph(cs scpb.CurrentState) (*scgraph.Graph, error) {
return opRegistry.buildGraph(cs)
func BuildGraph(ctx context.Context, cs scpb.CurrentState) (*scgraph.Graph, error) {
return opRegistry.buildGraph(ctx, cs)
}

func (r *registry) buildGraph(cs scpb.CurrentState) (_ *scgraph.Graph, err error) {
func (r *registry) buildGraph(
ctx context.Context, cs scpb.CurrentState,
) (_ *scgraph.Graph, err error) {
start := timeutil.Now()
defer func() {
if err != nil || !log.V(2) {
if err != nil || !log.ExpensiveLogEnabled(ctx, 2) {
return
}
log.Infof(context.TODO(), "operation graph generation took %v", timeutil.Since(start))
log.Infof(ctx, "operation graph generation took %v", timeutil.Since(start))
}()
g, err := scgraph.New(cs)
if err != nil {
Expand Down
18 changes: 12 additions & 6 deletions pkg/sql/schemachanger/scplan/internal/rules/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

// ApplyDepRules adds dependency edges to the graph according to the
// registered dependency rules.
func ApplyDepRules(g *scgraph.Graph) error {
func ApplyDepRules(ctx context.Context, g *scgraph.Graph) error {
for _, dr := range registry.depRules {
start := timeutil.Now()
var added int
Expand All @@ -41,9 +41,15 @@ func ApplyDepRules(g *scgraph.Graph) error {
}); err != nil {
return errors.Wrapf(err, "applying dep rule %s", dr.name)
}
if log.V(2) {
// Applying the dep rules can be slow in some cases. Check for
// cancellation when applying the rules to ensure we don't spin for
// too long while the user is waiting for the task to exit cleanly.
if ctx.Err() != nil {
return ctx.Err()
}
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(
context.TODO(), "applying dep rule %s %d took %v",
ctx, "applying dep rule %s %d took %v",
dr.name, added, timeutil.Since(start),
)
}
Expand All @@ -53,7 +59,7 @@ func ApplyDepRules(g *scgraph.Graph) error {

// ApplyOpRules marks op edges as no-op in a shallow copy of the graph according
// to the registered rules.
func ApplyOpRules(g *scgraph.Graph) (*scgraph.Graph, error) {
func ApplyOpRules(ctx context.Context, g *scgraph.Graph) (*scgraph.Graph, error) {
db := g.Database()
m := make(map[*screl.Node][]scgraph.RuleName)
for _, rule := range registry.opRules {
Expand All @@ -68,9 +74,9 @@ func ApplyOpRules(g *scgraph.Graph) (*scgraph.Graph, error) {
if err != nil {
return nil, errors.Wrapf(err, "applying op rule %s", rule.name)
}
if log.V(2) {
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(
context.TODO(), "applying op rule %s %d took %v",
ctx, "applying op rule %s %d took %v",
rule.name, added, timeutil.Since(start),
)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/schemachanger/scplan/internal/scstage/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package scstage

import (
"context"
"fmt"
"sort"
"strings"
Expand All @@ -31,7 +32,11 @@ import (
// Note that the scJobIDSupplier function is idempotent, and must return the
// same value for all calls.
func BuildStages(
init scpb.CurrentState, phase scop.Phase, g *scgraph.Graph, scJobIDSupplier func() jobspb.JobID,
ctx context.Context,
init scpb.CurrentState,
phase scop.Phase,
g *scgraph.Graph,
scJobIDSupplier func() jobspb.JobID,
) []Stage {
c := buildContext{
rollback: init.InRollback,
Expand Down
28 changes: 14 additions & 14 deletions pkg/sql/schemachanger/scplan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,19 @@ func (p Plan) StagesForCurrentPhase() []scstage.Stage {

// MakePlan generates a Plan for a particular phase of a schema change, given
// the initial state for a set of targets. Returns an error when planning fails.
func MakePlan(initial scpb.CurrentState, params Params) (p Plan, err error) {
func MakePlan(ctx context.Context, initial scpb.CurrentState, params Params) (p Plan, err error) {
p = Plan{
CurrentState: initial,
Params: params,
}
err = makePlan(&p)
if err != nil {
err = makePlan(ctx, &p)
if err != nil && ctx.Err() == nil {
err = p.DecorateErrorWithPlanDetails(err)
}
return p, err
}

func makePlan(p *Plan) (err error) {
func makePlan(ctx context.Context, p *Plan) (err error) {
defer func() {
if r := recover(); r != nil {
rAsErr, ok := r.(error)
Expand All @@ -99,18 +99,18 @@ func makePlan(p *Plan) (err error) {
}()
{
start := timeutil.Now()
p.Graph = buildGraph(p.CurrentState)
if log.V(2) {
log.Infof(context.TODO(), "graph generation took %v", timeutil.Since(start))
p.Graph = buildGraph(ctx, p.CurrentState)
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "graph generation took %v", timeutil.Since(start))
}
}
{
start := timeutil.Now()
p.Stages = scstage.BuildStages(
p.CurrentState, p.Params.ExecutionPhase, p.Graph, p.Params.SchemaChangerJobIDSupplier,
ctx, p.CurrentState, p.Params.ExecutionPhase, p.Graph, p.Params.SchemaChangerJobIDSupplier,
)
if log.V(2) {
log.Infof(context.TODO(), "stage generation took %v", timeutil.Since(start))
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "stage generation took %v", timeutil.Since(start))
}
}
if n := len(p.Stages); n > 0 && p.Stages[n-1].Phase > scop.PreCommitPhase {
Expand All @@ -123,20 +123,20 @@ func makePlan(p *Plan) (err error) {
return nil
}

func buildGraph(cs scpb.CurrentState) *scgraph.Graph {
g, err := opgen.BuildGraph(cs)
func buildGraph(ctx context.Context, cs scpb.CurrentState) *scgraph.Graph {
g, err := opgen.BuildGraph(ctx, cs)
if err != nil {
panic(errors.Wrapf(err, "build graph op edges"))
}
err = rules.ApplyDepRules(g)
err = rules.ApplyDepRules(ctx, g)
if err != nil {
panic(errors.Wrapf(err, "build graph dep edges"))
}
err = g.Validate()
if err != nil {
panic(errors.Wrapf(err, "validate graph"))
}
g, err = rules.ApplyOpRules(g)
g, err = rules.ApplyOpRules(ctx, g)
if err != nil {
panic(errors.Wrapf(err, "mark op edges as no-op"))
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/schemachanger/scrun/scrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func runTransactionPhase(
if len(state.Current) == 0 {
return scpb.CurrentState{}, jobspb.InvalidJobID, nil
}
sc, err := scplan.MakePlan(state, scplan.Params{
sc, err := scplan.MakePlan(ctx, state, scplan.Params{
ExecutionPhase: phase,
SchemaChangerJobIDSupplier: deps.TransactionalJobRegistry().SchemaChangerJobID,
})
Expand Down Expand Up @@ -112,7 +112,7 @@ func RunSchemaChangesInJob(
}
return errors.Wrapf(err, "failed to construct state for job %d", jobID)
}
sc, err := scplan.MakePlan(state, scplan.Params{
sc, err := scplan.MakePlan(ctx, state, scplan.Params{
ExecutionPhase: scop.PostCommitPhase,
SchemaChangerJobIDSupplier: func() jobspb.JobID { return jobID },
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/schemachanger/sctest/end_to_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func checkExplainDiagrams(
params.InRollback = true
params.ExecutionPhase = scop.PostCommitNonRevertiblePhase
}
pl, err := scplan.MakePlan(state, params)
pl, err := scplan.MakePlan(context.Background(), state, params)
require.NoErrorf(t, err, "%s: %s", fileNameSuffix, explainedStmt)
action(explainDir, "ddl", pl.ExplainCompact)
action(explainVerboseDir, "ddl, verbose", pl.ExplainVerbose)
Expand Down

0 comments on commit 8778f63

Please sign in to comment.