Skip to content

Commit

Permalink
Remove expired data from the database, ISSUE-952
Browse files Browse the repository at this point in the history
  • Loading branch information
abador committed Apr 22, 2022
1 parent 7f87bca commit df0f175
Show file tree
Hide file tree
Showing 23 changed files with 379 additions and 0 deletions.
21 changes: 21 additions & 0 deletions cmd/cleanup/root.go
@@ -0,0 +1,21 @@
package cleanup

import (
"github.com/ory/x/configx"
"github.com/spf13/cobra"
)

func NewCleanupCmd() *cobra.Command {
c := &cobra.Command{
Use: "cleanup",
Short: "Various cleanup helpers",
}
configx.RegisterFlags(c.PersistentFlags())
return c
}

func RegisterCommandRecursive(parent *cobra.Command) {
c := NewCleanupCmd()
parent.AddCommand(c)
c.AddCommand(NewCleanupSQLCmd())
}
49 changes: 49 additions & 0 deletions cmd/cleanup/sql.go
@@ -0,0 +1,49 @@
/*
Copyright © 2019 NAME HERE <EMAIL ADDRESS>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cleanup

import (
"time"

"github.com/ory/kratos/driver/config"
"github.com/spf13/cobra"

"github.com/ory/kratos/cmd/cliclient"
"github.com/ory/x/configx"
)

// cleanupSqlCmd represents the sql command
func NewCleanupSQLCmd() *cobra.Command {
c := &cobra.Command{
Use: "sql <database-url>",
Short: "Cleanup sql database from expired flows and sessions",
Long: `Run this command as frequently as you need.
It is recommended to run this command close to the SQL instance (e.g. same subnet) instead of over the public internet.
This decreases risk of failure and decreases time required.
You can read in the database URL using the -e flag, for example:
export DSN=...
kratos cleanup sql -e
### WARNING ###
Before running this command on an existing database, create a back up!
`,
Run: func(cmd *cobra.Command, args []string) {
cliclient.NewCleanupHandler().CleanupSQL(cmd, args)
},
}

configx.RegisterFlags(c.PersistentFlags())
c.Flags().BoolP("read-from-env", "e", true, "If set, reads the database connection string from the environment variable DSN or config file key dsn.")
c.Flags().IntP(config.ViperKeyDatabaseCleanupBatchSize, "b", 100, "Set the number of records to be cleaned per run")
c.Flags().Duration(config.ViperKeyDatabaseCleanupSleepTables, time.Minute, "How long to wait between each table cleanup")
return c
}
59 changes: 59 additions & 0 deletions cmd/cliclient/cleanup.go
@@ -0,0 +1,59 @@
package cliclient

import (
"fmt"
"os"

"github.com/ory/x/configx"

"github.com/spf13/cobra"

"github.com/ory/kratos/driver"
"github.com/ory/kratos/driver/config"
"github.com/ory/x/cmdx"
"github.com/ory/x/flagx"
)

type CleanupHandler struct{}

func NewCleanupHandler() *CleanupHandler {
return &CleanupHandler{}
}

func (h *CleanupHandler) CleanupSQL(cmd *cobra.Command, args []string) {
var d driver.Registry

if flagx.MustGetBool(cmd, "read-from-env") {
d = driver.NewWithoutInit(
cmd.Context(),
cmd.ErrOrStderr(),
configx.WithFlags(cmd.Flags()),
configx.SkipValidation())
if len(d.Config(cmd.Context()).DSN()) == 0 {
fmt.Println(cmd.UsageString())
fmt.Println("")
fmt.Println("When using flag -e, environment variable DSN must be set")
os.Exit(1)
return
}
} else {
if len(args) != 1 {
fmt.Println(cmd.UsageString())
os.Exit(1)
return
}
d = driver.NewWithoutInit(
cmd.Context(),
cmd.ErrOrStderr(),
configx.WithFlags(cmd.Flags()),
configx.SkipValidation(),
configx.WithValue(config.ViperKeyDSN, args[0]))
}

err := d.Init(cmd.Context(), driver.SkipNetworkInit)
cmdx.Must(err, "An error occurred initializing migrations: %s", err)

err = d.Persister().CleanupDatabase(cmd.Context(), d.Config(cmd.Context()).DatabaseCleanupSleepTables(), d.Config(cmd.Context()).DatabaseCleanupOlderThan())
cmdx.Must(err, "An error occurred while cleaning up expired data: %s", err)

}
3 changes: 3 additions & 0 deletions cmd/root.go
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"os"

"github.com/ory/kratos/cmd/cleanup"

"github.com/ory/kratos/driver/config"

"github.com/ory/kratos/cmd/courier"
Expand Down Expand Up @@ -36,6 +38,7 @@ func NewRootCmd() (cmd *cobra.Command) {
cmd.AddCommand(identities.NewListCmd(cmd))
migrate.RegisterCommandRecursive(cmd)
serve.RegisterCommandRecursive(cmd)
cleanup.RegisterCommandRecursive(cmd)
remote.RegisterCommandRecursive(cmd)
cmd.AddCommand(identities.NewValidateCmd())
cmd.AddCommand(cmdx.Version(&config.Version, &config.Commit, &config.Date))
Expand Down
2 changes: 2 additions & 0 deletions continuity/persistence.go
Expand Up @@ -2,6 +2,7 @@ package continuity

import (
"context"
"time"

"github.com/gofrs/uuid"
)
Expand All @@ -14,4 +15,5 @@ type Persister interface {
SaveContinuitySession(ctx context.Context, c *Container) error
GetContinuitySession(ctx context.Context, id uuid.UUID) (*Container, error)
DeleteContinuitySession(ctx context.Context, id uuid.UUID) error
DeleteExpiredContinuitySessions(context.Context, time.Time, int) error
}
17 changes: 17 additions & 0 deletions driver/config/config.go
Expand Up @@ -15,6 +15,8 @@ import (
"testing"
"time"

"github.com/santhosh-tekuri/jsonschema"

"github.com/ory/jsonschema/v3/httploader"
"github.com/ory/x/httpx"

Expand Down Expand Up @@ -153,6 +155,9 @@ const (
ViperKeyHasherArgon2ConfigDedicatedMemory = "hashers.argon2.dedicated_memory"
ViperKeyHasherBcryptCost = "hashers.bcrypt.cost"
ViperKeyCipherAlgorithm = "ciphers.algorithm"
ViperKeyDatabaseCleanupBatchSize = "database.cleanup.batch_size"
ViperKeyDatabaseCleanupOlderThan = "database.cleanup.older_than"
ViperKeyDatabaseCleanupSleepTables = "database.cleanup.sleep.tables"
ViperKeyLinkLifespan = "selfservice.methods.link.config.lifespan"
ViperKeyLinkBaseURL = "selfservice.methods.link.config.base_url"
ViperKeyPasswordHaveIBeenPwnedHost = "selfservice.methods.password.config.haveibeenpwned_host"
Expand Down Expand Up @@ -1080,6 +1085,18 @@ func (p *Config) SelfServiceLinkMethodBaseURL() *url.URL {
return p.p.RequestURIF(ViperKeyLinkBaseURL, p.SelfPublicURL())
}

func (p *Config) DatabaseCleanupBatchSize() int {
return p.p.IntF(ViperKeyDatabaseCleanupBatchSize, 100)
}

func (p *Config) DatabaseCleanupSleepTables() time.Duration {
return p.p.DurationF(ViperKeyDatabaseCleanupSleepTables, 1*time.Minute)
}

func (p *Config) DatabaseCleanupOlderThan() time.Duration {
return p.p.DurationF(ViperKeyDatabaseCleanupOlderThan, 0)
}

func (p *Config) SelfServiceFlowRecoveryAfterHooks(strategy string) []SelfServiceHook {
return p.selfServiceHooks(HookStrategyKey(ViperKeySelfServiceRecoveryAfter, strategy))
}
Expand Down
43 changes: 43 additions & 0 deletions embedx/config.schema.json
Expand Up @@ -1433,6 +1433,49 @@
}
}
},
"database": {
"type": "object",
"title": "Database related configuration",
"description": "Miscellaneous settings used in database related tasks (cleanup, etc.)",
"properties": {
"cleanup": {
"type": "object",
"title": "Database cleanup settings",
"description": "Settings that controls how the database cleanup process is configured (delays, batch size, etc.)",
"properties": {
"batch_size" : {
"type": "integer",
"title": "Number of records to clean in one iteration",
"description": "Controls how many records should be purged from one table during database cleanup task",
"minimum": 1,
"default": 100
},
"sleep": {
"type": "object",
"title": "Delays between various database cleanup phases",
"description": "Configures delays between each step of the cleanup process. It is useful to tune the process so it will be efficient and performant.",
"properties": {
"tables": {
"type": "string",
"title": "Delay between each table cleanups",
"description": "Controls the delay time between cleaning each table in one cleanup iteration",
"pattern": "^[0-9]+(ns|us|ms|s|m|h)$",
"default": "1m"
}
}
},
"older_than": {
"type": "string",
"title": "Remove records older than",
"description": "Controls how old records do we want to leave",
"pattern": "^[0-9]+(ns|us|ms|s|m|h)$",
"default": "0s"
}
}
}
},
"additionalProperties": false
},
"dsn": {
"type": "string",
"title": "Data Source Name",
Expand Down
4 changes: 4 additions & 0 deletions internal/driver.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"os"
"testing"
"time"

"github.com/ory/kratos/corp"

Expand Down Expand Up @@ -45,6 +46,9 @@ func NewConfigurationWithDefaults(t *testing.T) *config.Config {
config.ViperKeyCourierSMTPURL: "smtp://foo:bar@baz.com/",
config.ViperKeySelfServiceBrowserDefaultReturnTo: "https://www.ory.sh/redirect-not-set",
config.ViperKeySecretsCipher: []string{"secret-thirty-two-character-long"},
config.ViperKeyDatabaseCleanupBatchSize: 100,
config.ViperKeyDatabaseCleanupSleepTables: 1 * time.Minute,
config.ViperKeyDatabaseCleanupOlderThan: 1 * time.Minute,
}),
configx.SkipValidation(),
)
Expand Down
2 changes: 2 additions & 0 deletions persistence/reference.go
Expand Up @@ -2,6 +2,7 @@ package persistence

import (
"context"
"time"

"github.com/ory/x/networkx"

Expand Down Expand Up @@ -43,6 +44,7 @@ type Persister interface {
link.RecoveryTokenPersister
link.VerificationTokenPersister

CleanupDatabase(context.Context, time.Duration, time.Duration) error
Close(context.Context) error
Ping() error
MigrationStatus(c context.Context) (popx.MigrationStatuses, error)
Expand Down
52 changes: 52 additions & 0 deletions persistence/sql/persister.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"embed"
"fmt"
"time"

"github.com/ory/x/fsx"

Expand Down Expand Up @@ -135,6 +136,57 @@ type node interface {
GetNID() uuid.UUID
}

func (p *Persister) CleanupDatabase(ctx context.Context, wait time.Duration, older time.Duration) error {
currentTime := time.Now().Add(-older)
deleteLimit := p.r.Config(ctx).DatabaseCleanupBatchSize()
p.r.Logger().Printf("Cleaning up first %d records older than %s\n", deleteLimit, currentTime)

p.r.Logger().Println("Cleaning up expired sessions")
if err := p.DeleteExpiredSessions(ctx, currentTime, deleteLimit); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Cleaning up expired continuity containers")
if err := p.DeleteExpiredContinuitySessions(ctx, currentTime, deleteLimit); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Cleaning up expired login flows")
if err := p.DeleteExpiredLoginFlows(ctx, currentTime, deleteLimit); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Cleaning up expired recovery flows")
if err := p.DeleteExpiredRecoveryFlows(ctx, currentTime, deleteLimit); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Cleaning up expired registation flows")
if err := p.DeleteExpiredRegistrationFlows(ctx, currentTime, deleteLimit); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Cleaning up expired settings flows")
if err := p.DeleteExpiredSettingsFlows(ctx, currentTime, deleteLimit); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Cleaning up expired verification flows")
if err := p.DeleteExpiredVerificationFlows(ctx, currentTime, deleteLimit); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Successfully cleaned up the SQL database!")
return nil
}

func (p *Persister) update(ctx context.Context, v node, columnNames ...string) error {
c := p.GetConnection(ctx)
quoter, ok := c.Dialect.(quotable)
Expand Down
16 changes: 16 additions & 0 deletions persistence/sql/persister_continuity.go
Expand Up @@ -3,6 +3,7 @@ package sql
import (
"context"
"fmt"
"time"

"github.com/pkg/errors"

Expand Down Expand Up @@ -41,3 +42,18 @@ func (p *Persister) DeleteContinuitySession(ctx context.Context, id uuid.UUID) e
}
return nil
}

func (p *Persister) DeleteExpiredContinuitySessions(ctx context.Context, expiresAt time.Time, limit int) error {
// #nosec G201
err := p.GetConnection(ctx).RawQuery(fmt.Sprintf(
"DELETE FROM %s WHERE expires_at <= ? LIMIT ?",
new(continuity.Container).TableName(ctx),
),
expiresAt,
limit,
).Exec()
if err != nil {
return sqlcon.HandleError(err)
}
return nil
}

0 comments on commit df0f175

Please sign in to comment.