From 0b79ff095983442f5e18e944bbe6ef7dee3d6fee Mon Sep 17 00:00:00 2001 From: Gabriel Saratura Date: Wed, 5 Oct 2022 18:16:40 +0300 Subject: [PATCH] changed code after review --- pkg/database/database.go | 90 +++++++++++++++++++---------------- pkg/sos/objectstorage.go | 56 ++++++++-------------- pkg/sos/objectstorage_test.go | 2 +- 3 files changed, 70 insertions(+), 78 deletions(-) diff --git a/pkg/database/database.go b/pkg/database/database.go index 41c8abe..88800d0 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -58,12 +58,16 @@ type AggregatedBucket struct { // Database holds raw url of the postgresql database with the opened connection type Database struct { - Url string - billingDate time.Time + URL string + BillingDate time.Time + connection *sqlx.DB +} +type transactionContext struct { + context.Context + billingDate time.Time namespace *string aggregatedBucket *AggregatedBucket - connection *sqlx.DB transaction *sqlx.Tx tenant *db.Tenant category *db.Category @@ -76,7 +80,7 @@ type Database struct { // OpenConnection opens a connection to the postgres database func (d *Database) OpenConnection() error { - connection, err := db.Openx(d.Url) + connection, err := db.Openx(d.URL) if err != nil { return fmt.Errorf("cannot create a connection to the database: %w", err) } @@ -96,14 +100,17 @@ func (d *Database) CloseConnection() error { // EnsureBucketUsage saves the aggregated buckets usage by namespace to the postgresql database // To save the correct data to the database the function also matches a relevant product, discount (if any) and query. // The storage usage is referred to a day before the application ran (yesterday) -func (d *Database) EnsureBucketUsage(ctx context.Context, namespace string, aggregatedBucket AggregatedBucket, billingDate time.Time) error { +func (d *Database) EnsureBucketUsage(ctx context.Context, namespace string, aggregatedBucket AggregatedBucket) error { log := ctrl.LoggerFrom(ctx) log.Info("Saving buckets usage for namespace", "namespace", namespace, "storage used", aggregatedBucket.StorageUsed) - d.namespace = &namespace - d.billingDate = billingDate - d.aggregatedBucket = &aggregatedBucket - p := pipeline.NewPipeline[context.Context]() + tctx := &transactionContext{ + Context: ctx, + namespace: &namespace, + aggregatedBucket: &aggregatedBucket, + billingDate: d.BillingDate, + } + p := pipeline.NewPipeline[*transactionContext]() p.WithSteps( p.NewStep("Begin database transaction", d.beginTransaction), p.NewStep("Ensure necessary models", d.ensureModels), @@ -111,89 +118,90 @@ func (d *Database) EnsureBucketUsage(ctx context.Context, namespace string, aggr p.NewStep("Save facts", d.saveFacts), p.NewStep("Commit transaction", d.commitTransaction), ) - err := p.RunWithContext(ctx) + err := p.RunWithContext(tctx) if err != nil { - d.transaction.Rollback() - return fmt.Errorf("could not save to database: %w", err) + log.Info("Buckets usage have not been saved to the database", "namespace", namespace, "error", err.Error()) + tctx.transaction.Rollback() + return err } return nil } -func (d *Database) beginTransaction(ctx context.Context) error { +func (d *Database) beginTransaction(ctx *transactionContext) error { tx, err := d.connection.BeginTxx(ctx, &sql.TxOptions{}) if err != nil { - return fmt.Errorf("cannot create database transaction for namespace %s: %w", *d.namespace, err) + return fmt.Errorf("cannot create database transaction for namespace %s: %w", *ctx.namespace, err) } - d.transaction = tx + ctx.transaction = tx return nil } -func (d *Database) ensureModels(ctx context.Context) error { - namespace := *d.namespace - tenant, err := tenantsmodel.Ensure(ctx, d.transaction, &db.Tenant{Source: d.aggregatedBucket.Organization}) +func (d *Database) ensureModels(ctx *transactionContext) error { + namespace := *ctx.namespace + tenant, err := tenantsmodel.Ensure(ctx, ctx.transaction, &db.Tenant{Source: ctx.aggregatedBucket.Organization}) if err != nil { return fmt.Errorf("cannot ensure organization for namespace %s: %w", namespace, err) } - d.tenant = tenant + ctx.tenant = tenant - category, err := categoriesmodel.Ensure(ctx, d.transaction, &db.Category{Source: provider + ":" + namespace}) + category, err := categoriesmodel.Ensure(ctx, ctx.transaction, &db.Category{Source: provider + ":" + namespace}) if err != nil { return fmt.Errorf("cannot ensure category for namespace %s: %w", namespace, err) } - d.category = category + ctx.category = category - dateTime := datetimesmodel.New(d.billingDate) - dateTime, err = datetimesmodel.Ensure(ctx, d.transaction, dateTime) + dateTime := datetimesmodel.New(ctx.billingDate) + dateTime, err = datetimesmodel.Ensure(ctx, ctx.transaction, dateTime) if err != nil { return fmt.Errorf("cannot ensure date time for namespace %s: %w", namespace, err) } - d.dateTime = dateTime + ctx.dateTime = dateTime return nil } -func (d *Database) getBestMatch(ctx context.Context) error { - namespace := *d.namespace - productMatch, err := productsmodel.GetBestMatch(ctx, d.transaction, getSourceString(namespace, d.aggregatedBucket.Organization), d.billingDate) +func (d *Database) getBestMatch(ctx *transactionContext) error { + namespace := *ctx.namespace + productMatch, err := productsmodel.GetBestMatch(ctx, ctx.transaction, getSourceString(namespace, ctx.aggregatedBucket.Organization), ctx.billingDate) if err != nil { return fmt.Errorf("cannot get product best match for namespace %s: %w", namespace, err) } - d.product = productMatch + ctx.product = productMatch - discountMatch, err := discountsmodel.GetBestMatch(ctx, d.transaction, getSourceString(namespace, d.aggregatedBucket.Organization), d.billingDate) + discountMatch, err := discountsmodel.GetBestMatch(ctx, ctx.transaction, getSourceString(namespace, ctx.aggregatedBucket.Organization), ctx.billingDate) if err != nil { return fmt.Errorf("cannot get discount best match for namespace %s: %w", namespace, err) } - d.discount = discountMatch + ctx.discount = discountMatch - queryMatch, err := queriesmodel.GetByName(ctx, d.transaction, queryAndZone) + queryMatch, err := queriesmodel.GetByName(ctx, ctx.transaction, queryAndZone) if err != nil { return fmt.Errorf("cannot get query by name for namespace %s: %w", namespace, err) } - d.query = queryMatch + ctx.query = queryMatch var quantity float64 if query.Unit == defaultUnit { - quantity = d.aggregatedBucket.StorageUsed / 1000 / 1000 / 1000 + quantity = ctx.aggregatedBucket.StorageUsed / 1000 / 1000 / 1000 } else { return fmt.Errorf("unknown query unit %s", query.Unit) } - d.quantity = &quantity + ctx.quantity = &quantity return nil } -func (d *Database) saveFacts(ctx context.Context) error { - storageFact := factsmodel.New(d.dateTime, d.query, d.tenant, d.category, d.product, d.discount, *d.quantity) - _, err := factsmodel.Ensure(ctx, d.transaction, storageFact) +func (d *Database) saveFacts(ctx *transactionContext) error { + storageFact := factsmodel.New(ctx.dateTime, ctx.query, ctx.tenant, ctx.category, ctx.product, ctx.discount, *ctx.quantity) + _, err := factsmodel.Ensure(ctx, ctx.transaction, storageFact) if err != nil { - return fmt.Errorf("cannot save fact for namespace %s: %w", *d.namespace, err) + return fmt.Errorf("cannot save fact for namespace %s: %w", *ctx.namespace, err) } return nil } -func (d *Database) commitTransaction(_ context.Context) error { - err := d.transaction.Commit() +func (d *Database) commitTransaction(ctx *transactionContext) error { + err := ctx.transaction.Commit() if err != nil { - return fmt.Errorf("cannot commit transaction for buckets in namespace %s: %w", *d.namespace, err) + return fmt.Errorf("cannot commit transaction for buckets in namespace %s: %w", *ctx.namespace, err) } return nil } diff --git a/pkg/sos/objectstorage.go b/pkg/sos/objectstorage.go index 8b83ddf..e09bebf 100644 --- a/pkg/sos/objectstorage.go +++ b/pkg/sos/objectstorage.go @@ -22,13 +22,13 @@ const ( exoscaleBillingHour = 6 ) +// ObjectStorage contains exoscale and k8s clients along with the database connection type ObjectStorage struct { - k8sClient *k8s.Client + k8sClient k8s.Client exoscaleClient *egoscale.Client database *db.Database bucketDetails []BucketDetail aggregatedBuckets map[string]db.AggregatedBucket - billingDate time.Time } type BucketDetail struct { @@ -38,8 +38,8 @@ type BucketDetail struct { func NewObjectStorage(exoscaleClient *egoscale.Client, k8sClient *k8s.Client, databaseURL string) ObjectStorage { return ObjectStorage{ exoscaleClient: exoscaleClient, - k8sClient: k8sClient, - database: &db.Database{Url: databaseURL}, + k8sClient: *k8sClient, + database: &db.Database{URL: databaseURL}, } } @@ -82,9 +82,8 @@ func (o *ObjectStorage) fetchManagedBuckets(ctx context.Context) error { log.Info("Fetching buckets from cluster") buckets := exoscalev1.BucketList{} - client := *o.k8sClient log.V(1).Info("Listing buckets from cluster") - err := client.List(ctx, &buckets) + err := o.k8sClient.List(ctx, &buckets) if err != nil { return fmt.Errorf("cannot list buckets: %w", err) } @@ -111,9 +110,9 @@ func (o *ObjectStorage) saveToDatabase(ctx context.Context) error { log.V(1).Info("Saving buckets information usage to database") for namespace, aggregatedBucket := range o.aggregatedBuckets { - err = o.database.EnsureBucketUsage(ctx, namespace, aggregatedBucket, o.billingDate) + err = o.database.EnsureBucketUsage(ctx, namespace, aggregatedBucket) if err != nil { - log.Info("WARNING: Buckets usage have not been saved to the database", "namespace", namespace, "error", err.Error()) + return err } } @@ -127,44 +126,29 @@ func (o *ObjectStorage) getBillingDate(_ context.Context) error { } now := time.Now().In(location) previousDay := now.Day() - 1 - o.billingDate = time.Date(now.Year(), now.Month(), previousDay, exoscaleBillingHour, 0, 0, 0, now.Location()) + o.database.BillingDate = time.Date(now.Year(), now.Month(), previousDay, exoscaleBillingHour, 0, 0, 0, now.Location()) return nil } func getAggregatedBuckets(ctx context.Context, sosBucketsUsage []oapi.SosBucketUsage, bucketDetails []BucketDetail) map[string]db.AggregatedBucket { log := ctrl.LoggerFrom(ctx) log.Info("Aggregating buckets by namespace") + + sosBucketsUsageMap := make(map[string]oapi.SosBucketUsage, len(sosBucketsUsage)) + for _, usage := range sosBucketsUsage { + sosBucketsUsageMap[*usage.Name] = usage + } + aggregatedBuckets := make(map[string]db.AggregatedBucket) for _, bucketDetail := range bucketDetails { log.V(1).Info("Checking bucket", "bucket", bucketDetail.BucketName) - storageUsed := float64(0) - usageBucketExists := false - - // Match managed kubernetes resource bucket to the exoscale usage bucket to retrieve the amount of storage used - for _, bucketUsage := range sosBucketsUsage { - if bucketDetail.BucketName == *bucketUsage.Name { - log.V(1).Info("Found exoscale bucket usage", "bucket", bucketUsage.Name, "bucket size", bucketUsage.Name) - storageUsed = float64(*bucketUsage.Size) - usageBucketExists = true - break - } - } - // Create AggregatedBucket object only if the object exists in exoscale - if usageBucketExists { - aggregatedBucket, exists := aggregatedBuckets[bucketDetail.Namespace] - if exists { - log.V(1).Info("AggregatedBucket exists, adding its used storage to the current AggregatedBucket", - "bucket", bucketDetail.BucketName, "storage to be added", storageUsed) - aggregatedBucket.StorageUsed = aggregatedBucket.StorageUsed + storageUsed - aggregatedBuckets[bucketDetail.Namespace] = aggregatedBucket - } else { - log.V(1).Info("Adding new AggregatedBucket object", "bucket", bucketDetail.BucketName, "bucket storage", storageUsed) - aggregatedBuckets[bucketDetail.Namespace] = db.AggregatedBucket{ - Organization: bucketDetail.Organization, - StorageUsed: storageUsed, - } - } + if bucketUsage, exists := sosBucketsUsageMap[bucketDetail.BucketName]; exists { + log.V(1).Info("Found exoscale bucket usage", "bucket", bucketUsage.Name, "bucket size", bucketUsage.Name) + aggregatedBucket := aggregatedBuckets[bucketDetail.Namespace] + aggregatedBucket.Organization = bucketDetail.Organization + aggregatedBucket.StorageUsed += float64(*bucketUsage.Size) + aggregatedBuckets[bucketDetail.Namespace] = aggregatedBucket } else { log.Info("Could not find any bucket on exoscale", "bucket", bucketDetail.BucketName) } diff --git a/pkg/sos/objectstorage_test.go b/pkg/sos/objectstorage_test.go index 0057488..cc48c9b 100644 --- a/pkg/sos/objectstorage_test.go +++ b/pkg/sos/objectstorage_test.go @@ -99,7 +99,7 @@ func TestObjectStorage_GetAggregatedBuckets(t *testing.T) { } } -func TestObjectStorage_AadOrgAndNamespaceToBucket(t *testing.T) { +func TestObjectStorage_AddOrgAndNamespaceToBucket(t *testing.T) { tests := map[string]struct { givenBucketList exoscalev1.BucketList expectedBucketDetails []BucketDetail