Skip to content

Commit

Permalink
Change to database pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
zugao committed Oct 4, 2022
1 parent fd44c3e commit eb9cd92
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 20 deletions.
84 changes: 67 additions & 17 deletions pkg/database/database.go
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"fmt"
"github.com/appuio/appuio-cloud-reporting/pkg/db"
pipeline "github.com/ccremer/go-command-pipeline"
"github.com/jmoiron/sqlx"
"github.com/vshn/cloudscale-metrics-collector/pkg/categoriesmodel"
"github.com/vshn/cloudscale-metrics-collector/pkg/datetimesmodel"
Expand Down Expand Up @@ -58,8 +59,20 @@ type AggregatedBucket struct {

// Database holds raw url of the postgresql database with the opened connection
type Database struct {
Url string
connection *sqlx.DB
Url string
billingDate time.Time

namespace *string
aggregatedBucket *AggregatedBucket
connection *sqlx.DB
transaction *sqlx.Tx
tenant *db.Tenant
category *db.Category
dateTime *db.DateTime
product *db.Product
discount *db.Discount
query *db.Query
quantity *float64
}

// ValidateDatabaseURL validates environment variable dbUrlEnvVariable which holds the database URL
Expand Down Expand Up @@ -97,58 +110,95 @@ func (d *Database) EnsureBucketUsage(ctx context.Context, namespace string, aggr
log := ctrl.LoggerFrom(ctx)
log.Info("Saving buckets usage for namespace", "namespace", namespace, "storage used", aggregatedBucket.StorageUsed)

// start new transaction for actual work
d.namespace = &namespace
d.billingDate = billingDate
d.aggregatedBucket = &aggregatedBucket
p := pipeline.NewPipeline[context.Context]()
p.WithSteps(
p.NewStep("Begin database transaction", d.beginTransaction),
p.NewStep("Ensure necessary models", d.ensureModels),
p.NewStep("Get best match", d.getBestMatch),
p.NewStep("Save facts", d.saveFacts),
p.NewStep("Commit transaction", d.commitTransaction),
)
return p.RunWithContext(ctx)
}

func (d *Database) beginTransaction(ctx context.Context) error {
tx, err := d.connection.BeginTxx(ctx, &sql.TxOptions{})
if err != nil {
return fmt.Errorf("cannot create database transaction for namespace %s: %w", namespace, err)
return fmt.Errorf("cannot create database transaction for namespace %s: %w", d.namespace, err)
}
d.transaction = tx
return nil
}

tenant, err := tenantsmodel.Ensure(ctx, tx, &db.Tenant{Source: aggregatedBucket.Organization})
func (d *Database) ensureModels(ctx context.Context) error {
namespace := *d.namespace
tenant, err := tenantsmodel.Ensure(ctx, d.transaction, &db.Tenant{Source: d.aggregatedBucket.Organization})
if err != nil {
return fmt.Errorf("cannot ensure organization for namespace %s: %w", namespace, err)
}
d.tenant = tenant

category, err := categoriesmodel.Ensure(ctx, tx, &db.Category{Source: provider + ":" + namespace})
category, err := categoriesmodel.Ensure(ctx, d.transaction, &db.Category{Source: provider + ":" + namespace})
if err != nil {
return fmt.Errorf("cannot ensure category for namespace %s: %w", namespace, err)
}
d.category = category

dateTime := datetimesmodel.New(billingDate)
dateTime, err = datetimesmodel.Ensure(ctx, tx, dateTime)
dateTime := datetimesmodel.New(d.billingDate)
dateTime, err = datetimesmodel.Ensure(ctx, d.transaction, dateTime)
if err != nil {
return fmt.Errorf("cannot ensure date time for namespace %s: %w", namespace, err)
}
d.dateTime = dateTime
return nil
}

product, err := productsmodel.GetBestMatch(ctx, tx, getSourceString(namespace, aggregatedBucket.Organization), billingDate)
func (d *Database) getBestMatch(ctx context.Context) error {
namespace := *d.namespace
productMatch, err := productsmodel.GetBestMatch(ctx, d.transaction, getSourceString(namespace, d.aggregatedBucket.Organization), d.billingDate)
if err != nil {
return fmt.Errorf("cannot get product best match for namespace %s: %w", namespace, err)
}
d.product = productMatch

discount, err := discountsmodel.GetBestMatch(ctx, tx, getSourceString(namespace, aggregatedBucket.Organization), billingDate)
discountMatch, err := discountsmodel.GetBestMatch(ctx, d.transaction, getSourceString(namespace, d.aggregatedBucket.Organization), d.billingDate)
if err != nil {
return fmt.Errorf("cannot get discount best match for namespace %s: %w", namespace, err)
}
d.discount = discountMatch

query, err := queriesmodel.GetByName(ctx, tx, queryAndZone)
queryMatch, err := queriesmodel.GetByName(ctx, d.transaction, queryAndZone)
if err != nil {
return fmt.Errorf("cannot get query by name for namespace %s: %w", namespace, err)
}
d.query = queryMatch

var quantity float64
if query.Unit == defaultUnit {
quantity = float64(aggregatedBucket.StorageUsed) / 1000 / 1000 / 1000
quantity = float64(d.aggregatedBucket.StorageUsed) / 1000 / 1000 / 1000
} else {
return fmt.Errorf("unknown query unit %s", query.Unit)
}
storageFact := factsmodel.New(dateTime, query, tenant, category, product, discount, quantity)
_, err = factsmodel.Ensure(ctx, tx, storageFact)
d.quantity = &quantity
return nil
}

func (d *Database) saveFacts(ctx context.Context) error {
storageFact := factsmodel.New(d.dateTime, d.query, d.tenant, d.category, d.product, d.discount, *d.quantity)
_, err := factsmodel.Ensure(ctx, d.transaction, storageFact)
if err != nil {
return fmt.Errorf("cannot save fact for namespace %s: %w", namespace, err)
return fmt.Errorf("cannot save fact for namespace %s: %w", d.namespace, err)
}
return nil
}

err = tx.Commit()
func (d *Database) commitTransaction(_ context.Context) error {
err := d.transaction.Commit()
if err != nil {
return fmt.Errorf("cannot commit transaction for buckets in namespace %s: %w", namespace, err)
return fmt.Errorf("cannot commit transaction for buckets in namespace %s: %w", d.namespace, err)
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sos/objectstorage.go
Expand Up @@ -104,9 +104,9 @@ func (o *ObjectStorage) saveToDatabase(ctx context.Context) error {

log.V(1).Info("Saving buckets information usage to database")
for namespace, aggregatedBucket := range o.aggregatedBuckets {
e := o.database.EnsureBucketUsage(ctx, namespace, aggregatedBucket, o.billingDate)
if e != nil {
log.Info("WARNING: Buckets usage have not been saved to the database", "namespace", namespace, "error", e.Error())
err = o.database.EnsureBucketUsage(ctx, namespace, aggregatedBucket, o.billingDate)
if err != nil {
log.Info("WARNING: Buckets usage have not been saved to the database", "namespace", namespace, "error", err.Error())
}
}

Expand Down

0 comments on commit eb9cd92

Please sign in to comment.