From 612056ad993e303ea64f4cb87d4138a64f1faa3c Mon Sep 17 00:00:00 2001 From: Gabriel Saratura Date: Wed, 5 Oct 2022 18:16:40 +0300 Subject: [PATCH] changed code after review --- pkg/database/database.go | 87 +++++++++++++++++++---------------- pkg/sos/objectstorage.go | 50 +++++++------------- pkg/sos/objectstorage_test.go | 2 +- 3 files changed, 65 insertions(+), 74 deletions(-) diff --git a/pkg/database/database.go b/pkg/database/database.go index 41c8abe..1dd6381 100644 --- a/pkg/database/database.go +++ b/pkg/database/database.go @@ -58,12 +58,15 @@ type AggregatedBucket struct { // Database holds raw url of the postgresql database with the opened connection type Database struct { - Url string - billingDate time.Time + URL string + connection *sqlx.DB +} +type transactionContext struct { + context.Context + billingDate time.Time namespace *string aggregatedBucket *AggregatedBucket - connection *sqlx.DB transaction *sqlx.Tx tenant *db.Tenant category *db.Category @@ -76,7 +79,7 @@ type Database struct { // OpenConnection opens a connection to the postgres database func (d *Database) OpenConnection() error { - connection, err := db.Openx(d.Url) + connection, err := db.Openx(d.URL) if err != nil { return fmt.Errorf("cannot create a connection to the database: %w", err) } @@ -100,10 +103,13 @@ func (d *Database) EnsureBucketUsage(ctx context.Context, namespace string, aggr log := ctrl.LoggerFrom(ctx) log.Info("Saving buckets usage for namespace", "namespace", namespace, "storage used", aggregatedBucket.StorageUsed) - d.namespace = &namespace - d.billingDate = billingDate - d.aggregatedBucket = &aggregatedBucket - p := pipeline.NewPipeline[context.Context]() + tctx := &transactionContext{ + Context: ctx, + namespace: &namespace, + aggregatedBucket: &aggregatedBucket, + billingDate: billingDate, + } + p := pipeline.NewPipeline[*transactionContext]() p.WithSteps( p.NewStep("Begin database transaction", d.beginTransaction), p.NewStep("Ensure necessary models", d.ensureModels), @@ -111,89 +117,90 @@ func (d *Database) EnsureBucketUsage(ctx context.Context, namespace string, aggr p.NewStep("Save facts", d.saveFacts), p.NewStep("Commit transaction", d.commitTransaction), ) - err := p.RunWithContext(ctx) + err := p.RunWithContext(tctx) if err != nil { - d.transaction.Rollback() - return fmt.Errorf("could not save to database: %w", err) + log.Info("Buckets usage have not been saved to the database", "namespace", namespace, "error", err.Error()) + tctx.transaction.Rollback() + return err } return nil } -func (d *Database) beginTransaction(ctx context.Context) error { +func (d *Database) beginTransaction(ctx *transactionContext) error { tx, err := d.connection.BeginTxx(ctx, &sql.TxOptions{}) if err != nil { - return fmt.Errorf("cannot create database transaction for namespace %s: %w", *d.namespace, err) + return fmt.Errorf("cannot create database transaction for namespace %s: %w", *ctx.namespace, err) } - d.transaction = tx + ctx.transaction = tx return nil } -func (d *Database) ensureModels(ctx context.Context) error { - namespace := *d.namespace - tenant, err := tenantsmodel.Ensure(ctx, d.transaction, &db.Tenant{Source: d.aggregatedBucket.Organization}) +func (d *Database) ensureModels(ctx *transactionContext) error { + namespace := *ctx.namespace + tenant, err := tenantsmodel.Ensure(ctx, ctx.transaction, &db.Tenant{Source: ctx.aggregatedBucket.Organization}) if err != nil { return fmt.Errorf("cannot ensure organization for namespace %s: %w", namespace, err) } - d.tenant = tenant + ctx.tenant = tenant - category, err := categoriesmodel.Ensure(ctx, d.transaction, &db.Category{Source: provider + ":" + namespace}) + category, err := categoriesmodel.Ensure(ctx, ctx.transaction, &db.Category{Source: provider + ":" + namespace}) if err != nil { return fmt.Errorf("cannot ensure category for namespace %s: %w", namespace, err) } - d.category = category + ctx.category = category - dateTime := datetimesmodel.New(d.billingDate) - dateTime, err = datetimesmodel.Ensure(ctx, d.transaction, dateTime) + dateTime := datetimesmodel.New(ctx.billingDate) + dateTime, err = datetimesmodel.Ensure(ctx, ctx.transaction, dateTime) if err != nil { return fmt.Errorf("cannot ensure date time for namespace %s: %w", namespace, err) } - d.dateTime = dateTime + ctx.dateTime = dateTime return nil } -func (d *Database) getBestMatch(ctx context.Context) error { - namespace := *d.namespace - productMatch, err := productsmodel.GetBestMatch(ctx, d.transaction, getSourceString(namespace, d.aggregatedBucket.Organization), d.billingDate) +func (d *Database) getBestMatch(ctx *transactionContext) error { + namespace := *ctx.namespace + productMatch, err := productsmodel.GetBestMatch(ctx, ctx.transaction, getSourceString(namespace, ctx.aggregatedBucket.Organization), ctx.billingDate) if err != nil { return fmt.Errorf("cannot get product best match for namespace %s: %w", namespace, err) } - d.product = productMatch + ctx.product = productMatch - discountMatch, err := discountsmodel.GetBestMatch(ctx, d.transaction, getSourceString(namespace, d.aggregatedBucket.Organization), d.billingDate) + discountMatch, err := discountsmodel.GetBestMatch(ctx, ctx.transaction, getSourceString(namespace, ctx.aggregatedBucket.Organization), ctx.billingDate) if err != nil { return fmt.Errorf("cannot get discount best match for namespace %s: %w", namespace, err) } - d.discount = discountMatch + ctx.discount = discountMatch - queryMatch, err := queriesmodel.GetByName(ctx, d.transaction, queryAndZone) + queryMatch, err := queriesmodel.GetByName(ctx, ctx.transaction, queryAndZone) if err != nil { return fmt.Errorf("cannot get query by name for namespace %s: %w", namespace, err) } - d.query = queryMatch + ctx.query = queryMatch var quantity float64 if query.Unit == defaultUnit { - quantity = d.aggregatedBucket.StorageUsed / 1000 / 1000 / 1000 + quantity = ctx.aggregatedBucket.StorageUsed / 1000 / 1000 / 1000 } else { return fmt.Errorf("unknown query unit %s", query.Unit) } - d.quantity = &quantity + ctx.quantity = &quantity return nil } -func (d *Database) saveFacts(ctx context.Context) error { - storageFact := factsmodel.New(d.dateTime, d.query, d.tenant, d.category, d.product, d.discount, *d.quantity) - _, err := factsmodel.Ensure(ctx, d.transaction, storageFact) +func (d *Database) saveFacts(ctx *transactionContext) error { + storageFact := factsmodel.New(ctx.dateTime, ctx.query, ctx.tenant, ctx.category, ctx.product, ctx.discount, *ctx.quantity) + _, err := factsmodel.Ensure(ctx, ctx.transaction, storageFact) if err != nil { - return fmt.Errorf("cannot save fact for namespace %s: %w", *d.namespace, err) + return fmt.Errorf("cannot save fact for namespace %s: %w", *ctx.namespace, err) } return nil } -func (d *Database) commitTransaction(_ context.Context) error { - err := d.transaction.Commit() +func (d *Database) commitTransaction(ctx *transactionContext) error { + err := ctx.transaction.Commit() if err != nil { - return fmt.Errorf("cannot commit transaction for buckets in namespace %s: %w", *d.namespace, err) + return fmt.Errorf("cannot commit transaction for buckets in namespace %s: %w", *ctx.namespace, err) } return nil } diff --git a/pkg/sos/objectstorage.go b/pkg/sos/objectstorage.go index 8b83ddf..929b073 100644 --- a/pkg/sos/objectstorage.go +++ b/pkg/sos/objectstorage.go @@ -23,7 +23,7 @@ const ( ) type ObjectStorage struct { - k8sClient *k8s.Client + k8sClient k8s.Client exoscaleClient *egoscale.Client database *db.Database bucketDetails []BucketDetail @@ -38,8 +38,8 @@ type BucketDetail struct { func NewObjectStorage(exoscaleClient *egoscale.Client, k8sClient *k8s.Client, databaseURL string) ObjectStorage { return ObjectStorage{ exoscaleClient: exoscaleClient, - k8sClient: k8sClient, - database: &db.Database{Url: databaseURL}, + k8sClient: *k8sClient, + database: &db.Database{URL: databaseURL}, } } @@ -82,9 +82,8 @@ func (o *ObjectStorage) fetchManagedBuckets(ctx context.Context) error { log.Info("Fetching buckets from cluster") buckets := exoscalev1.BucketList{} - client := *o.k8sClient log.V(1).Info("Listing buckets from cluster") - err := client.List(ctx, &buckets) + err := o.k8sClient.List(ctx, &buckets) if err != nil { return fmt.Errorf("cannot list buckets: %w", err) } @@ -113,7 +112,7 @@ func (o *ObjectStorage) saveToDatabase(ctx context.Context) error { for namespace, aggregatedBucket := range o.aggregatedBuckets { err = o.database.EnsureBucketUsage(ctx, namespace, aggregatedBucket, o.billingDate) if err != nil { - log.Info("WARNING: Buckets usage have not been saved to the database", "namespace", namespace, "error", err.Error()) + return err } } @@ -134,37 +133,22 @@ func (o *ObjectStorage) getBillingDate(_ context.Context) error { func getAggregatedBuckets(ctx context.Context, sosBucketsUsage []oapi.SosBucketUsage, bucketDetails []BucketDetail) map[string]db.AggregatedBucket { log := ctrl.LoggerFrom(ctx) log.Info("Aggregating buckets by namespace") + + sosBucketsUsageMap := make(map[string]oapi.SosBucketUsage, len(sosBucketsUsage)) + for _, usage := range sosBucketsUsage { + sosBucketsUsageMap[*usage.Name] = usage + } + aggregatedBuckets := make(map[string]db.AggregatedBucket) for _, bucketDetail := range bucketDetails { log.V(1).Info("Checking bucket", "bucket", bucketDetail.BucketName) - storageUsed := float64(0) - usageBucketExists := false - - // Match managed kubernetes resource bucket to the exoscale usage bucket to retrieve the amount of storage used - for _, bucketUsage := range sosBucketsUsage { - if bucketDetail.BucketName == *bucketUsage.Name { - log.V(1).Info("Found exoscale bucket usage", "bucket", bucketUsage.Name, "bucket size", bucketUsage.Name) - storageUsed = float64(*bucketUsage.Size) - usageBucketExists = true - break - } - } - // Create AggregatedBucket object only if the object exists in exoscale - if usageBucketExists { - aggregatedBucket, exists := aggregatedBuckets[bucketDetail.Namespace] - if exists { - log.V(1).Info("AggregatedBucket exists, adding its used storage to the current AggregatedBucket", - "bucket", bucketDetail.BucketName, "storage to be added", storageUsed) - aggregatedBucket.StorageUsed = aggregatedBucket.StorageUsed + storageUsed - aggregatedBuckets[bucketDetail.Namespace] = aggregatedBucket - } else { - log.V(1).Info("Adding new AggregatedBucket object", "bucket", bucketDetail.BucketName, "bucket storage", storageUsed) - aggregatedBuckets[bucketDetail.Namespace] = db.AggregatedBucket{ - Organization: bucketDetail.Organization, - StorageUsed: storageUsed, - } - } + if bucketUsage, exists := sosBucketsUsageMap[bucketDetail.BucketName]; exists { + log.V(1).Info("Found exoscale bucket usage", "bucket", bucketUsage.Name, "bucket size", bucketUsage.Name) + aggregatedBucket := aggregatedBuckets[bucketDetail.Namespace] + aggregatedBucket.Organization = bucketDetail.Organization + aggregatedBucket.StorageUsed += float64(*bucketUsage.Size) + aggregatedBuckets[bucketDetail.Namespace] = aggregatedBucket } else { log.Info("Could not find any bucket on exoscale", "bucket", bucketDetail.BucketName) } diff --git a/pkg/sos/objectstorage_test.go b/pkg/sos/objectstorage_test.go index 0057488..cc48c9b 100644 --- a/pkg/sos/objectstorage_test.go +++ b/pkg/sos/objectstorage_test.go @@ -99,7 +99,7 @@ func TestObjectStorage_GetAggregatedBuckets(t *testing.T) { } } -func TestObjectStorage_AadOrgAndNamespaceToBucket(t *testing.T) { +func TestObjectStorage_AddOrgAndNamespaceToBucket(t *testing.T) { tests := map[string]struct { givenBucketList exoscalev1.BucketList expectedBucketDetails []BucketDetail