Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove expired data from the database, ISSUE-952 #2406

Merged
merged 10 commits into from Jun 20, 2022
21 changes: 21 additions & 0 deletions cmd/cleanup/root.go
@@ -0,0 +1,21 @@
package cleanup

import (
"github.com/ory/x/configx"
"github.com/spf13/cobra"
)

func NewCleanupCmd() *cobra.Command {
c := &cobra.Command{
Use: "cleanup",
Short: "Various cleanup helpers",
}
configx.RegisterFlags(c.PersistentFlags())
return c
}

func RegisterCommandRecursive(parent *cobra.Command) {
c := NewCleanupCmd()
parent.AddCommand(c)
c.AddCommand(NewCleanupSQLCmd())
}
53 changes: 53 additions & 0 deletions cmd/cleanup/sql.go
@@ -0,0 +1,53 @@
/*
Copyright © 2019 NAME HERE <EMAIL ADDRESS>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cleanup

import (
"fmt"
"time"

"github.com/ory/kratos/driver/config"
"github.com/spf13/cobra"

"github.com/ory/kratos/cmd/cliclient"
"github.com/ory/x/configx"
)

// cleanupSqlCmd represents the sql command
func NewCleanupSQLCmd() *cobra.Command {
c := &cobra.Command{
Use: "sql <database-url>",
Short: "Cleanup sql database from expired flows and sessions",
Long: `Run this command as frequently as you need.
It is recommended to run this command close to the SQL instance (e.g. same subnet) instead of over the public internet.
This decreases risk of failure and decreases time required.
You can read in the database URL using the -e flag, for example:
export DSN=...
kratos cleanup sql -e
### WARNING ###
Before running this command on an existing database, create a back up!
`,
Run: func(cmd *cobra.Command, args []string) {
err := cliclient.NewCleanupHandler().CleanupSQL(cmd, args)
if err != nil {
fmt.Fprintln(cmd.OutOrStdout(), err)
}
},
abador marked this conversation as resolved.
Show resolved Hide resolved
}

configx.RegisterFlags(c.PersistentFlags())
c.Flags().BoolP("read-from-env", "e", true, "If set, reads the database connection string from the environment variable DSN or config file key dsn.")
c.Flags().Duration(config.ViperKeyDatabaseCleanupSleepTables, time.Minute, "How long to wait between each table cleanup")
abador marked this conversation as resolved.
Show resolved Hide resolved
c.Flags().Duration("keep-last", 0, "Don't remove records younger than")
return c
}
65 changes: 65 additions & 0 deletions cmd/cliclient/cleanup.go
@@ -0,0 +1,65 @@
package cliclient

import (
"fmt"

"github.com/pkg/errors"

"github.com/ory/x/configx"

"github.com/spf13/cobra"

"github.com/ory/kratos/driver"
"github.com/ory/kratos/driver/config"
"github.com/ory/x/cmdx"
"github.com/ory/x/flagx"
)

type CleanupHandler struct{}

func NewCleanupHandler() *CleanupHandler {
return &CleanupHandler{}
}

func (h *CleanupHandler) CleanupSQL(cmd *cobra.Command, args []string) error {
var d driver.Registry

if flagx.MustGetBool(cmd, "read-from-env") {
d = driver.NewWithoutInit(
cmd.Context(),
cmd.ErrOrStderr(),
configx.WithFlags(cmd.Flags()),
configx.SkipValidation())
if len(d.Config(cmd.Context()).DSN()) == 0 {
fmt.Fprintln(cmd.OutOrStdout(), cmd.UsageString())
fmt.Fprintln(cmd.OutOrStdout(), "")
fmt.Fprintln(cmd.OutOrStdout(), "When using flag -e, environment variable DSN must be set")
return cmdx.FailSilently(cmd)
}
} else {
if len(args) != 1 {
fmt.Println(cmd.UsageString())
return cmdx.FailSilently(cmd)
}
d = driver.NewWithoutInit(
cmd.Context(),
cmd.ErrOrStderr(),
configx.WithFlags(cmd.Flags()),
configx.SkipValidation(),
configx.WithValue(config.ViperKeyDSN, args[0]))
}
abador marked this conversation as resolved.
Show resolved Hide resolved

err := d.Init(cmd.Context(), driver.SkipNetworkInit)
if err != nil {
return errors.Wrap(err, "An error occurred initializing cleanup")
}

keepLast := flagx.MustGetDuration(cmd, "keep-last")

err = d.Persister().CleanupDatabase(cmd.Context(), d.Config(cmd.Context()).DatabaseCleanupSleepTables(), keepLast)
if err != nil {
return errors.Wrap(err, "An error occurred while cleaning up expired data")
}

return nil
}
3 changes: 3 additions & 0 deletions cmd/root.go
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"os"

"github.com/ory/kratos/cmd/cleanup"

"github.com/ory/kratos/driver/config"

"github.com/ory/kratos/cmd/courier"
Expand Down Expand Up @@ -36,6 +38,7 @@ func NewRootCmd() (cmd *cobra.Command) {
cmd.AddCommand(identities.NewListCmd(cmd))
migrate.RegisterCommandRecursive(cmd)
serve.RegisterCommandRecursive(cmd)
cleanup.RegisterCommandRecursive(cmd)
remote.RegisterCommandRecursive(cmd)
cmd.AddCommand(identities.NewValidateCmd())
cmd.AddCommand(cmdx.Version(&config.Version, &config.Commit, &config.Date))
Expand Down
2 changes: 2 additions & 0 deletions continuity/persistence.go
Expand Up @@ -2,6 +2,7 @@ package continuity

import (
"context"
"time"

"github.com/gofrs/uuid"
)
Expand All @@ -14,4 +15,5 @@ type Persister interface {
SaveContinuitySession(ctx context.Context, c *Container) error
GetContinuitySession(ctx context.Context, id uuid.UUID) (*Container, error)
DeleteContinuitySession(ctx context.Context, id uuid.UUID) error
DeleteExpiredContinuitySessions(context.Context, time.Time) error
}
7 changes: 7 additions & 0 deletions driver/config/config.go
Expand Up @@ -15,6 +15,8 @@ import (
"testing"
"time"

"github.com/santhosh-tekuri/jsonschema"

"github.com/ory/jsonschema/v3/httploader"
"github.com/ory/x/httpx"

Expand Down Expand Up @@ -153,6 +155,7 @@ const (
ViperKeyHasherArgon2ConfigDedicatedMemory = "hashers.argon2.dedicated_memory"
ViperKeyHasherBcryptCost = "hashers.bcrypt.cost"
ViperKeyCipherAlgorithm = "ciphers.algorithm"
ViperKeyDatabaseCleanupSleepTables = "database.cleanup.sleep.tables"
ViperKeyLinkLifespan = "selfservice.methods.link.config.lifespan"
ViperKeyLinkBaseURL = "selfservice.methods.link.config.base_url"
ViperKeyPasswordHaveIBeenPwnedHost = "selfservice.methods.password.config.haveibeenpwned_host"
Expand Down Expand Up @@ -1080,6 +1083,10 @@ func (p *Config) SelfServiceLinkMethodBaseURL() *url.URL {
return p.p.RequestURIF(ViperKeyLinkBaseURL, p.SelfPublicURL())
}

func (p *Config) DatabaseCleanupSleepTables() time.Duration {
return p.p.DurationF(ViperKeyDatabaseCleanupSleepTables, 1*time.Minute)
}

func (p *Config) SelfServiceFlowRecoveryAfterHooks(strategy string) []SelfServiceHook {
return p.selfServiceHooks(HookStrategyKey(ViperKeySelfServiceRecoveryAfter, strategy))
}
Expand Down
43 changes: 43 additions & 0 deletions embedx/config.schema.json
Expand Up @@ -1433,6 +1433,49 @@
}
}
},
"database": {
"type": "object",
"title": "Database related configuration",
"description": "Miscellaneous settings used in database related tasks (cleanup, etc.)",
"properties": {
"cleanup": {
"type": "object",
"title": "Database cleanup settings",
"description": "Settings that controls how the database cleanup process is configured (delays, batch size, etc.)",
"properties": {
"batch_size" : {
"type": "integer",
"title": "Number of records to clean in one iteration",
"description": "Controls how many records should be purged from one table during database cleanup task",
"minimum": 1,
"default": 100
},
"sleep": {
"type": "object",
"title": "Delays between various database cleanup phases",
"description": "Configures delays between each step of the cleanup process. It is useful to tune the process so it will be efficient and performant.",
"properties": {
"tables": {
"type": "string",
"title": "Delay between each table cleanups",
"description": "Controls the delay time between cleaning each table in one cleanup iteration",
"pattern": "^[0-9]+(ns|us|ms|s|m|h)$",
"default": "1m"
}
}
},
"older_than": {
"type": "string",
"title": "Remove records older than",
"description": "Controls how old records do we want to leave",
"pattern": "^[0-9]+(ns|us|ms|s|m|h)$",
"default": "0s"
}
}
}
},
"additionalProperties": false
},
"dsn": {
"type": "string",
"title": "Data Source Name",
Expand Down
2 changes: 2 additions & 0 deletions internal/driver.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"os"
"testing"
"time"

"github.com/ory/kratos/corp"

Expand Down Expand Up @@ -45,6 +46,7 @@ func NewConfigurationWithDefaults(t *testing.T) *config.Config {
config.ViperKeyCourierSMTPURL: "smtp://foo:bar@baz.com/",
config.ViperKeySelfServiceBrowserDefaultReturnTo: "https://www.ory.sh/redirect-not-set",
config.ViperKeySecretsCipher: []string{"secret-thirty-two-character-long"},
config.ViperKeyDatabaseCleanupSleepTables: 1 * time.Minute,
}),
configx.SkipValidation(),
)
Expand Down
2 changes: 2 additions & 0 deletions persistence/reference.go
Expand Up @@ -2,6 +2,7 @@ package persistence

import (
"context"
"time"

"github.com/ory/x/networkx"

Expand Down Expand Up @@ -43,6 +44,7 @@ type Persister interface {
link.RecoveryTokenPersister
link.VerificationTokenPersister

CleanupDatabase(context.Context, time.Duration, time.Duration) error
Close(context.Context) error
Ping() error
MigrationStatus(c context.Context) (popx.MigrationStatuses, error)
Expand Down
51 changes: 51 additions & 0 deletions persistence/sql/persister.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"embed"
"fmt"
"time"

"github.com/ory/x/fsx"

Expand Down Expand Up @@ -135,6 +136,56 @@ type node interface {
GetNID() uuid.UUID
}

func (p *Persister) CleanupDatabase(ctx context.Context, wait time.Duration, older time.Duration) error {
currentTime := time.Now().Add(-older)
zepatrik marked this conversation as resolved.
Show resolved Hide resolved
p.r.Logger().Printf("Cleaning up records older than %s\n", currentTime)

p.r.Logger().Println("Cleaning up expired sessions")
if err := p.DeleteExpiredSessions(ctx, currentTime); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Cleaning up expired continuity containers")
if err := p.DeleteExpiredContinuitySessions(ctx, currentTime); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Cleaning up expired login flows")
if err := p.DeleteExpiredLoginFlows(ctx, currentTime); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Cleaning up expired recovery flows")
if err := p.DeleteExpiredRecoveryFlows(ctx, currentTime); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Cleaning up expired registation flows")
if err := p.DeleteExpiredRegistrationFlows(ctx, currentTime); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Cleaning up expired settings flows")
if err := p.DeleteExpiredSettingsFlows(ctx, currentTime); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Cleaning up expired verification flows")
if err := p.DeleteExpiredVerificationFlows(ctx, currentTime); err != nil {
return err
}
time.Sleep(wait)

p.r.Logger().Println("Successfully cleaned up the SQL database!")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point we did not fully clean the database but rather purged batchSize from each of the tables. If there are more expired rows, they would not have been deleted. The output should reflect that as we will have to re-run the cleanup multiple times until all data are truly cleaned.

return nil
}

func (p *Persister) update(ctx context.Context, v node, columnNames ...string) error {
c := p.GetConnection(ctx)
quoter, ok := c.Dialect.(quotable)
Expand Down
15 changes: 15 additions & 0 deletions persistence/sql/persister_continuity.go
Expand Up @@ -3,6 +3,7 @@ package sql
import (
"context"
"fmt"
"time"

"github.com/pkg/errors"

Expand Down Expand Up @@ -41,3 +42,17 @@ func (p *Persister) DeleteContinuitySession(ctx context.Context, id uuid.UUID) e
}
return nil
}

func (p *Persister) DeleteExpiredContinuitySessions(ctx context.Context, expiresAt time.Time) error {
// #nosec G201
err := p.GetConnection(ctx).RawQuery(fmt.Sprintf(
"DELETE FROM %s WHERE expires_at <= ?",
new(continuity.Container).TableName(ctx),
),
expiresAt,
).Exec()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
err := p.GetConnection(ctx).RawQuery(fmt.Sprintf(
"DELETE FROM %s WHERE expires_at <= ?",
new(continuity.Container).TableName(ctx),
),
expiresAt,
).Exec()
err := p.GetConnection(ctx)
.Where("expires_at <= ?", expiresAt)
.Delete(new(continuity.Container)),

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left the raw query since it's done the same way everywhere else. Shouldn't this be done in a follow-up with all other cases?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this query is that it will be extremely slow as it is unbound. Check out this PR for batch-based processing: https://github.com/grantzvolsky/hydra/pull/2/files#diff-6034803b09ef5017e3aa7d3082827dadb4503e6ee3a56853d49edd419a75f864

if err != nil {
return sqlcon.HandleError(err)
}
return nil
}