Skip to content

Commit

Permalink
Delete using subselect, ISSUE-952
Browse files Browse the repository at this point in the history
  • Loading branch information
abador committed May 26, 2022
1 parent 2321574 commit 0580b63
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 44 deletions.
6 changes: 5 additions & 1 deletion cmd/cleanup/sql.go
Expand Up @@ -16,6 +16,8 @@ import (
"fmt"
"time"

"github.com/ory/x/cmdx"

"github.com/spf13/cobra"

"github.com/ory/kratos/driver/config"
Expand All @@ -38,11 +40,13 @@ You can read in the database URL using the -e flag, for example:
### WARNING ###
Before running this command on an existing database, create a back up!
`,
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
err := cliclient.NewCleanupHandler().CleanupSQL(cmd, args)
if err != nil {
fmt.Fprintln(cmd.OutOrStdout(), err)
return cmdx.FailSilently(cmd)
}
return nil
},
}

Expand Down
24 changes: 24 additions & 0 deletions cmd/cleanup/sql_test.go
@@ -0,0 +1,24 @@
package cleanup

import (
"bytes"
"io/ioutil"
"strings"
"testing"
)

func Test_ExecuteCleanupFailedDSN(t *testing.T) {
cmd := NewCleanupSQLCmd()
b := bytes.NewBufferString("")
cmd.SetOut(b)
cmd.SetArgs([]string{"--read-from-env=false"})
cmd.Execute()
out, err := ioutil.ReadAll(b)
if err != nil {
t.Fatal(err)
}
if !strings.Contains(string(out), "expected to get the DSN as an argument") {
t.Fatalf("expected \"%s\" got \"%s\"", "expected to get the DSN as an argument", string(out))
}
cmd.Execute()
}
41 changes: 16 additions & 25 deletions cmd/cliclient/cleanup.go
@@ -1,8 +1,6 @@
package cliclient

import (
"fmt"

"github.com/pkg/errors"

"github.com/ory/x/configx"
Expand All @@ -11,7 +9,6 @@ import (

"github.com/ory/kratos/driver"
"github.com/ory/kratos/driver/config"
"github.com/ory/x/cmdx"
"github.com/ory/x/flagx"
)

Expand All @@ -22,31 +19,25 @@ func NewCleanupHandler() *CleanupHandler {
}

func (h *CleanupHandler) CleanupSQL(cmd *cobra.Command, args []string) error {
var d driver.Registry
opts := []configx.OptionModifier{
configx.WithFlags(cmd.Flags()),
configx.SkipValidation(),
}

if flagx.MustGetBool(cmd, "read-from-env") {
d = driver.NewWithoutInit(
cmd.Context(),
cmd.ErrOrStderr(),
configx.WithFlags(cmd.Flags()),
configx.SkipValidation())
if len(d.Config(cmd.Context()).DSN()) == 0 {
fmt.Fprintln(cmd.OutOrStdout(), cmd.UsageString())
fmt.Fprintln(cmd.OutOrStdout(), "")
fmt.Fprintln(cmd.OutOrStdout(), "When using flag -e, environment variable DSN must be set")
return cmdx.FailSilently(cmd)
}
} else {
if !flagx.MustGetBool(cmd, "read-from-env") {
if len(args) != 1 {
fmt.Println(cmd.UsageString())
return cmdx.FailSilently(cmd)
return errors.New(`expected to get the DSN as an argument, or the "read-from-env" flag`)
}
d = driver.NewWithoutInit(
cmd.Context(),
cmd.ErrOrStderr(),
configx.WithFlags(cmd.Flags()),
configx.SkipValidation(),
configx.WithValue(config.ViperKeyDSN, args[0]))
opts = append(opts, configx.WithValue(config.ViperKeyDSN, args[0]))
}

d := driver.NewWithoutInit(
cmd.Context(),
cmd.ErrOrStderr(),
opts...,
)
if len(d.Config(cmd.Context()).DSN()) == 0 {
return errors.New(`required config value "dsn" was not set`)
}

err := d.Init(cmd.Context(), driver.SkipNetworkInit)
Expand Down
2 changes: 1 addition & 1 deletion continuity/persistence.go
Expand Up @@ -15,5 +15,5 @@ type Persister interface {
SaveContinuitySession(ctx context.Context, c *Container) error
GetContinuitySession(ctx context.Context, id uuid.UUID) (*Container, error)
DeleteContinuitySession(ctx context.Context, id uuid.UUID) error
DeleteExpiredContinuitySessions(context.Context, time.Time, int) error
DeleteExpiredContinuitySessions(ctx context.Context, deleteOlder time.Time, pageSize int) error
}
4 changes: 2 additions & 2 deletions continuity/test/persistence.go
Expand Up @@ -97,10 +97,10 @@ func TestPersister(ctx context.Context, p interface {

t.Run("case=cleanup", func(t *testing.T) {
id := x.NewUUID()
now := time.Now().Add(-24 * time.Hour).UTC().Truncate(time.Second)
yesterday := time.Now().Add(-24 * time.Hour).UTC().Truncate(time.Second)
m := sqlxx.NullJSONRawMessage(`{"foo": "bar"}`)
expected := continuity.Container{Name: "foo", IdentityID: x.PointToUUID(createIdentity(t).ID),
ExpiresAt: now,
ExpiresAt: yesterday,
Payload: m,
}
expected.ID = id
Expand Down
4 changes: 2 additions & 2 deletions driver/config/config.go
Expand Up @@ -1077,11 +1077,11 @@ func (p *Config) SelfServiceLinkMethodBaseURL() *url.URL {
}

func (p *Config) DatabaseCleanupSleepTables() time.Duration {
return p.p.DurationF(ViperKeyDatabaseCleanupSleepTables, 5*time.Second)
return p.p.Duration(ViperKeyDatabaseCleanupSleepTables)
}

func (p *Config) DatabaseCleanupBatchSize() int {
return p.p.IntF(ViperKeyDatabaseCleanupBatchSize, 100)
return p.p.Int(ViperKeyDatabaseCleanupBatchSize)
}

func (p *Config) SelfServiceFlowRecoveryAfterHooks(strategy string) []SelfServiceHook {
Expand Down
6 changes: 3 additions & 3 deletions driver/config/config_test.go
Expand Up @@ -1159,10 +1159,10 @@ func TestCleanup(t *testing.T) {

t.Run("group=cleanup config", func(t *testing.T) {
assert.Equal(t, p.DatabaseCleanupSleepTables(), 1*time.Minute)
p.MustSet(config.ViperKeyDatabaseCleanupSleepTables, time.Second)
p.MustSet(config.ViperKeyDatabaseCleanupSleepTables, "1s")
assert.Equal(t, p.DatabaseCleanupSleepTables(), time.Second)
assert.Equal(t, p.DatabaseCleanupBatchSize(), 100)
p.MustSet(config.ViperKeyDatabaseCleanupBatchSize, 1)
assert.Equal(t, p.DatabaseCleanupSleepTables(), 1)
p.MustSet(config.ViperKeyDatabaseCleanupBatchSize, "1")
assert.Equal(t, p.DatabaseCleanupBatchSize(), 1)
})
}
3 changes: 2 additions & 1 deletion persistence/sql/persister.go
Expand Up @@ -185,7 +185,8 @@ func (p *Persister) CleanupDatabase(ctx context.Context, wait time.Duration, old
}
time.Sleep(wait)

p.r.Logger().Println("Successfully cleaned up the SQL database!")
p.r.Logger().Println("Successfully cleaned up the latest batch of the SQL database! " +
"This should be re-run periodically, to be sure that all expired data is purged.")
return nil
}

Expand Down
138 changes: 138 additions & 0 deletions persistence/sql/persister_cleanup_test.go
@@ -0,0 +1,138 @@
package sql_test

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/ory/kratos/internal"
)

func TestPersister_Cleanup(t *testing.T) {
_, reg := internal.NewFastRegistryWithMocks(t)
p := reg.Persister()
ctx := context.Background()

t.Run("case=should not throw error on cleanup", func(t *testing.T) {
assert.Nil(t, p.CleanupDatabase(ctx, 0, 0, reg.Config(context.Background()).DatabaseCleanupBatchSize()))
})

t.Run("case=should throw error on cleanup", func(t *testing.T) {
p.GetConnection(ctx).Close()
assert.Error(t, p.CleanupDatabase(ctx, 0, 0, reg.Config(context.Background()).DatabaseCleanupBatchSize()))
})
}

func TestPersister_Continuity_Cleanup(t *testing.T) {
_, reg := internal.NewFastRegistryWithMocks(t)
p := reg.Persister()
currentTime := time.Now()
ctx := context.Background()

t.Run("case=should not throw error on cleanup continuity sessions", func(t *testing.T) {
assert.Nil(t, p.DeleteExpiredContinuitySessions(ctx, currentTime, reg.Config(context.Background()).DatabaseCleanupBatchSize()))
})

t.Run("case=should throw error on cleanup continuity sessions", func(t *testing.T) {
p.GetConnection(ctx).Close()
assert.Error(t, p.DeleteExpiredContinuitySessions(ctx, currentTime, reg.Config(context.Background()).DatabaseCleanupBatchSize()))
})
}

func TestPersister_Login_Cleanup(t *testing.T) {
_, reg := internal.NewFastRegistryWithMocks(t)
p := reg.Persister()
currentTime := time.Now()
ctx := context.Background()

t.Run("case=should not throw error on cleanup login flows", func(t *testing.T) {
assert.Nil(t, p.DeleteExpiredLoginFlows(ctx, currentTime, reg.Config(context.Background()).DatabaseCleanupBatchSize()))
})

t.Run("case=should throw error on cleanup login flows", func(t *testing.T) {
p.GetConnection(ctx).Close()
assert.Error(t, p.DeleteExpiredLoginFlows(ctx, currentTime, reg.Config(context.Background()).DatabaseCleanupBatchSize()))
})
}

func TestPersister_Recovery_Cleanup(t *testing.T) {
_, reg := internal.NewFastRegistryWithMocks(t)
p := reg.Persister()
currentTime := time.Now()
ctx := context.Background()

t.Run("case=should not throw error on cleanup recovery flows", func(t *testing.T) {
assert.Nil(t, p.DeleteExpiredRecoveryFlows(ctx, currentTime, reg.Config(context.Background()).DatabaseCleanupBatchSize()))
})

t.Run("case=should throw error on cleanup recovery flows", func(t *testing.T) {
p.GetConnection(ctx).Close()
assert.Error(t, p.DeleteExpiredRecoveryFlows(ctx, currentTime, reg.Config(context.Background()).DatabaseCleanupBatchSize()))
})
}

func TestPersister_Registration_Cleanup(t *testing.T) {
_, reg := internal.NewFastRegistryWithMocks(t)
p := reg.Persister()
currentTime := time.Now()
ctx := context.Background()

t.Run("case=should not throw error on cleanup registration flows", func(t *testing.T) {
assert.Nil(t, p.DeleteExpiredRegistrationFlows(ctx, currentTime, reg.Config(context.Background()).DatabaseCleanupBatchSize()))
})

t.Run("case=should throw error on cleanup registration flows", func(t *testing.T) {
p.GetConnection(ctx).Close()
assert.Error(t, p.DeleteExpiredRegistrationFlows(ctx, currentTime, reg.Config(context.Background()).DatabaseCleanupBatchSize()))
})
}

func TestPersister_Session_Cleanup(t *testing.T) {
_, reg := internal.NewFastRegistryWithMocks(t)
p := reg.Persister()
currentTime := time.Now()
ctx := context.Background()

t.Run("case=should not throw error on cleanup sessions", func(t *testing.T) {
assert.Nil(t, p.DeleteExpiredSessions(ctx, currentTime, reg.Config(context.Background()).DatabaseCleanupBatchSize()))
})

t.Run("case=should throw error on cleanup sessions", func(t *testing.T) {
p.GetConnection(ctx).Close()
assert.Error(t, p.DeleteExpiredSessions(ctx, currentTime, reg.Config(context.Background()).DatabaseCleanupBatchSize()))
})
}

func TestPersister_Settings_Cleanup(t *testing.T) {
_, reg := internal.NewFastRegistryWithMocks(t)
p := reg.Persister()
currentTime := time.Now()
ctx := context.Background()

t.Run("case=should not throw error on cleanup setting flows", func(t *testing.T) {
assert.Nil(t, p.DeleteExpiredSettingsFlows(ctx, currentTime, reg.Config(context.Background()).DatabaseCleanupBatchSize()))
})

t.Run("case=should throw error on cleanup setting flows", func(t *testing.T) {
p.GetConnection(ctx).Close()
assert.Error(t, p.DeleteExpiredSettingsFlows(ctx, currentTime, reg.Config(context.Background()).DatabaseCleanupBatchSize()))
})
}

func TestPersister_Verification_Cleanup(t *testing.T) {
_, reg := internal.NewFastRegistryWithMocks(t)
p := reg.Persister()
currentTime := time.Now()
ctx := context.Background()

t.Run("case=should not throw error on cleanup verification flows", func(t *testing.T) {
assert.Nil(t, p.DeleteExpiredVerificationFlows(ctx, currentTime, reg.Config(context.Background()).DatabaseCleanupBatchSize()))
})

t.Run("case=should throw error on cleanup verification flows", func(t *testing.T) {
p.GetConnection(ctx).Close()
assert.Error(t, p.DeleteExpiredVerificationFlows(ctx, currentTime, reg.Config(context.Background()).DatabaseCleanupBatchSize()))
})
}
9 changes: 0 additions & 9 deletions persistence/sql/persister_test.go
Expand Up @@ -315,12 +315,3 @@ func TestPersister_Transaction(t *testing.T) {
assert.Equal(t, sqlcon.ErrNoRows.Error(), err.Error())
})
}

func TestPersister_Cleanup(t *testing.T) {
_, reg := internal.NewFastRegistryWithMocks(t)
p := reg.Persister()

t.Run("case=should not throw error on cleanup", func(t *testing.T) {
assert.Nil(t, p.CleanupDatabase(context.Background(), 0, 0, reg.Config(context.Background()).DatabaseCleanupBatchSize()))
})
}

0 comments on commit 0580b63

Please sign in to comment.