Skip to content

Commit

Permalink
Remove expired data from the database, ISSUE-952
Browse files Browse the repository at this point in the history
  • Loading branch information
abador committed Apr 19, 2022
1 parent 7f87bca commit 0b815c7
Show file tree
Hide file tree
Showing 23 changed files with 354 additions and 0 deletions.
33 changes: 33 additions & 0 deletions cmd/cleanup/background.go
@@ -0,0 +1,33 @@
package cleanup

import (
cx "context"
"time"

"github.com/ory/graceful"
"github.com/ory/kratos/driver"
)

func BackgroundCleanup(ctx cx.Context, r driver.Registry) {
ctx, cancel := cx.WithCancel(ctx)

r.Logger().Println("Cleanup worker started.")
if err := graceful.Graceful(func() error {
for {
select {
case <-time.After(r.Config(ctx).DatabaseCleanupSleepBackground()):
err := r.Persister().CleanupDatabase(ctx, r.Config(ctx).DatabaseCleanupSleepTables())
r.Logger().Error(err)
case <-ctx.Done():
return nil
}
}
}, func(_ cx.Context) error {
cancel()
return nil
}); err != nil {
r.Logger().WithError(err).Fatalf("Failed to run cleanup worker.")
}

r.Logger().Println("Background cleanup worker was shutdown gracefully.")
}
21 changes: 21 additions & 0 deletions cmd/cleanup/root.go
@@ -0,0 +1,21 @@
package cleanup

import (
"github.com/ory/x/configx"
"github.com/spf13/cobra"
)

func NewCleanupCmd() *cobra.Command {
c := &cobra.Command{
Use: "cleanup",
Short: "Various cleanup helpers",
}
configx.RegisterFlags(c.PersistentFlags())
return c
}

func RegisterCommandRecursive(parent *cobra.Command) {
c := NewCleanupCmd()
parent.AddCommand(c)
c.AddCommand(NewCleanupSQLCmd())
}
50 changes: 50 additions & 0 deletions cmd/cleanup/sql.go
@@ -0,0 +1,50 @@
/*
Copyright © 2019 NAME HERE <EMAIL ADDRESS>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cleanup

import (
"time"

"github.com/ory/kratos/driver/config"
"github.com/spf13/cobra"

"github.com/ory/kratos/cmd/cliclient"
"github.com/ory/x/configx"
)

// cleanupSqlCmd represents the sql command
func NewCleanupSQLCmd() *cobra.Command {
c := &cobra.Command{
Use: "sql <database-url>",
Short: "Cleanup sql database from expired flows and sessions",
Long: `Run this command as frequently as you need.
It is recommended to run this command close to the SQL instance (e.g. same subnet) instead of over the public internet.
This decreases risk of failure and decreases time required.
You can read in the database URL using the -e flag, for example:
export DSN=...
kratos cleanup sql -e
### WARNING ###
Before running this command on an existing database, create a back up!
`,
Run: func(cmd *cobra.Command, args []string) {
cliclient.NewCleanupHandler().CleanupSQL(cmd, args)
},
}

configx.RegisterFlags(c.PersistentFlags())
c.Flags().BoolP("read-from-env", "e", false, "If set, reads the database connection string from the environment variable DSN or config file key dsn.")
c.Flags().IntP(config.ViperKeyDatabaseCleanupBatchSize, "b", 100, "Set the number of records to be cleaned per run")
c.Flags().Duration(config.ViperKeyDatabaseCleanupSleepBackground, 30*time.Minute, "How long to wait between each cleanup run")
c.Flags().Duration(config.ViperKeyDatabaseCleanupSleepTables, time.Minute, "How long to wait between each table cleanup")
return c
}
3 changes: 3 additions & 0 deletions cmd/root.go
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"os"

"github.com/ory/kratos/cmd/cleanup"

"github.com/ory/kratos/driver/config"

"github.com/ory/kratos/cmd/courier"
Expand Down Expand Up @@ -36,6 +38,7 @@ func NewRootCmd() (cmd *cobra.Command) {
cmd.AddCommand(identities.NewListCmd(cmd))
migrate.RegisterCommandRecursive(cmd)
serve.RegisterCommandRecursive(cmd)
cleanup.RegisterCommandRecursive(cmd)
remote.RegisterCommandRecursive(cmd)
cmd.AddCommand(identities.NewValidateCmd())
cmd.AddCommand(cmdx.Version(&config.Version, &config.Commit, &config.Date))
Expand Down
2 changes: 2 additions & 0 deletions continuity/persistence.go
Expand Up @@ -2,6 +2,7 @@ package continuity

import (
"context"
"time"

"github.com/gofrs/uuid"
)
Expand All @@ -14,4 +15,5 @@ type Persister interface {
SaveContinuitySession(ctx context.Context, c *Container) error
GetContinuitySession(ctx context.Context, id uuid.UUID) (*Container, error)
DeleteContinuitySession(ctx context.Context, id uuid.UUID) error
DeleteExpiredContinuitySessions(context.Context, time.Time, int) error
}
17 changes: 17 additions & 0 deletions driver/config/config.go
Expand Up @@ -15,6 +15,8 @@ import (
"testing"
"time"

"github.com/santhosh-tekuri/jsonschema"

"github.com/ory/jsonschema/v3/httploader"
"github.com/ory/x/httpx"

Expand Down Expand Up @@ -153,6 +155,9 @@ const (
ViperKeyHasherArgon2ConfigDedicatedMemory = "hashers.argon2.dedicated_memory"
ViperKeyHasherBcryptCost = "hashers.bcrypt.cost"
ViperKeyCipherAlgorithm = "ciphers.algorithm"
ViperKeyDatabaseCleanupBatchSize = "database.cleanup.batch_size"
ViperKeyDatabaseCleanupSleepBackground = "database.cleanup.sleep.background"
ViperKeyDatabaseCleanupSleepTables = "database.cleanup.sleep.tables"
ViperKeyLinkLifespan = "selfservice.methods.link.config.lifespan"
ViperKeyLinkBaseURL = "selfservice.methods.link.config.base_url"
ViperKeyPasswordHaveIBeenPwnedHost = "selfservice.methods.password.config.haveibeenpwned_host"
Expand Down Expand Up @@ -1080,6 +1085,18 @@ func (p *Config) SelfServiceLinkMethodBaseURL() *url.URL {
return p.p.RequestURIF(ViperKeyLinkBaseURL, p.SelfPublicURL())
}

func (p *Config) DatabaseCleanupBatchSize() int {
return p.p.IntF(ViperKeyDatabaseCleanupBatchSize, 100)
}

func (p *Config) DatabaseCleanupSleepBackground() time.Duration {
return p.p.DurationF(ViperKeyDatabaseCleanupSleepBackground, 30*time.Minute)
}

func (p *Config) DatabaseCleanupSleepTables() time.Duration {
return p.p.DurationF(ViperKeyDatabaseCleanupSleepTables, 1*time.Minute)
}

func (p *Config) SelfServiceFlowRecoveryAfterHooks(strategy string) []SelfServiceHook {
return p.selfServiceHooks(HookStrategyKey(ViperKeySelfServiceRecoveryAfter, strategy))
}
Expand Down
43 changes: 43 additions & 0 deletions embedx/config.schema.json
Expand Up @@ -1433,6 +1433,49 @@
}
}
},
"database": {
"type": "object",
"title": "Database related configuration",
"description": "Miscellaneous settings used in database related tasks (cleanup, etc.)",
"properties": {
"cleanup": {
"type": "object",
"title": "Database cleanup settings",
"description": "Settings that controls how the database cleanup process is configured (delays, batch size, etc.)",
"properties": {
"batch_size" : {
"type": "integer",
"title": "Number of records to clean in one iteration",
"description": "Controls how many records should be purged from one table during database cleanup task",
"minimum": 1,
"default": 100
},
"sleep": {
"type": "object",
"title": "Delays between various database cleanup phases",
"description": "Configures delays between each step of the cleanup process. It is useful to tune the process so it will be efficient and performant.",
"properties": {
"background": {
"type": "string",
"title": "Delay between each background runs",
"description": "When running the task in the background this parameter controls how long to wait before staring a new database cleanup iteration",
"pattern": "^[0-9]+(ns|us|ms|s|m|h)$",
"default": "30m"
},
"tables": {
"type": "string",
"title": "Delay between each table cleanups",
"description": "Controls the delay time between cleaning each table in one cleanup iteration",
"pattern": "^[0-9]+(ns|us|ms|s|m|h)$",
"default": "1m"
}
}
}
}
}
},
"additionalProperties": false
},
"dsn": {
"type": "string",
"title": "Data Source Name",
Expand Down
4 changes: 4 additions & 0 deletions internal/driver.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"os"
"testing"
"time"

"github.com/ory/kratos/corp"

Expand Down Expand Up @@ -45,6 +46,9 @@ func NewConfigurationWithDefaults(t *testing.T) *config.Config {
config.ViperKeyCourierSMTPURL: "smtp://foo:bar@baz.com/",
config.ViperKeySelfServiceBrowserDefaultReturnTo: "https://www.ory.sh/redirect-not-set",
config.ViperKeySecretsCipher: []string{"secret-thirty-two-character-long"},
config.ViperKeyDatabaseCleanupBatchSize: 100,
config.ViperKeyDatabaseCleanupSleepBackground: 30 * time.Minute,
config.ViperKeyDatabaseCleanupSleepTables: 1 * time.Minute,
}),
configx.SkipValidation(),
)
Expand Down
2 changes: 2 additions & 0 deletions persistence/reference.go
Expand Up @@ -2,6 +2,7 @@ package persistence

import (
"context"
"time"

"github.com/ory/x/networkx"

Expand Down Expand Up @@ -43,6 +44,7 @@ type Persister interface {
link.RecoveryTokenPersister
link.VerificationTokenPersister

CleanupDatabase(context.Context, time.Duration) error
Close(context.Context) error
Ping() error
MigrationStatus(c context.Context) (popx.MigrationStatuses, error)
Expand Down
52 changes: 52 additions & 0 deletions persistence/sql/persister.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"embed"
"fmt"
"time"

"github.com/ory/x/fsx"

Expand Down Expand Up @@ -135,6 +136,57 @@ type node interface {
GetNID() uuid.UUID
}

func (p *Persister) CleanupDatabase(ctx context.Context, wait time.Duration) error {
currentTime := time.Now()
deleteLimit := p.r.Config(ctx).DatabaseCleanupBatchSize()
p.r.Logger().Printf("Cleaning up first %d records older than %s\n", deleteLimit, currentTime)

p.r.Logger().Println("Cleaning up expired sessions")
if err := p.DeleteExpiredSessions(ctx, currentTime, deleteLimit); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Cleaning up expired continuity containers")
if err := p.DeleteExpiredContinuitySessions(ctx, currentTime, deleteLimit); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Cleaning up expired login flows")
if err := p.DeleteExpiredLoginFlows(ctx, currentTime, deleteLimit); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Cleaning up expired recovery flows")
if err := p.DeleteExpiredRecoveryFlows(ctx, currentTime, deleteLimit); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Cleaning up expired registation flows")
if err := p.DeleteExpiredRegistrationFlows(ctx, currentTime, deleteLimit); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Cleaning up expired settings flows")
if err := p.DeleteExpiredSettingsFlows(ctx, currentTime, deleteLimit); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Cleaning up expired verification flows")
if err := p.DeleteExpiredVerificationFlows(ctx, currentTime, deleteLimit); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Successfully cleaned up the SQL database!")
return nil
}

func (p *Persister) update(ctx context.Context, v node, columnNames ...string) error {
c := p.GetConnection(ctx)
quoter, ok := c.Dialect.(quotable)
Expand Down
16 changes: 16 additions & 0 deletions persistence/sql/persister_continuity.go
Expand Up @@ -3,6 +3,7 @@ package sql
import (
"context"
"fmt"
"time"

"github.com/pkg/errors"

Expand Down Expand Up @@ -41,3 +42,18 @@ func (p *Persister) DeleteContinuitySession(ctx context.Context, id uuid.UUID) e
}
return nil
}

func (p *Persister) DeleteExpiredContinuitySessions(ctx context.Context, expiresAt time.Time, limit int) error {
// #nosec G201
err := p.GetConnection(ctx).RawQuery(fmt.Sprintf(
"DELETE FROM %s WHERE expires_at <= ? LIMIT ?",
new(continuity.Container).TableName(ctx),
),
expiresAt,
limit,
).Exec()
if err != nil {
return sqlcon.HandleError(err)
}
return nil
}
17 changes: 17 additions & 0 deletions persistence/sql/persister_login.go
Expand Up @@ -2,6 +2,8 @@ package sql

import (
"context"
"fmt"
"time"

"github.com/ory/kratos/corp"

Expand Down Expand Up @@ -51,3 +53,18 @@ func (p *Persister) ForceLoginFlow(ctx context.Context, id uuid.UUID) error {
return tx.Save(lr, "nid")
})
}

func (p *Persister) DeleteExpiredLoginFlows(ctx context.Context, expiresAt time.Time, limit int) error {
// #nosec G201
err := p.GetConnection(ctx).RawQuery(fmt.Sprintf(
"DELETE FROM %s WHERE expires_at <= ? LIMIT ?",
new(login.Flow).TableName(ctx),
),
expiresAt,
limit,
).Exec()
if err != nil {
return sqlcon.HandleError(err)
}
return nil
}
15 changes: 15 additions & 0 deletions persistence/sql/persister_recovery.go
Expand Up @@ -93,3 +93,18 @@ func (p *Persister) DeleteRecoveryToken(ctx context.Context, token string) error
/* #nosec G201 TableName is static */
return p.GetConnection(ctx).RawQuery(fmt.Sprintf("DELETE FROM %s WHERE token=? AND nid = ?", new(link.RecoveryToken).TableName(ctx)), token, corp.ContextualizeNID(ctx, p.nid)).Exec()
}

func (p *Persister) DeleteExpiredRecoveryFlows(ctx context.Context, expiresAt time.Time, limit int) error {
// #nosec G201
err := p.GetConnection(ctx).RawQuery(fmt.Sprintf(
"DELETE FROM %s WHERE expires_at <= ? LIMIT ?",
new(recovery.Flow).TableName(ctx),
),
expiresAt,
limit,
).Exec()
if err != nil {
return sqlcon.HandleError(err)
}
return nil
}

0 comments on commit 0b815c7

Please sign in to comment.