Skip to content

Commit

Permalink
changed code after review
Browse files Browse the repository at this point in the history
  • Loading branch information
zugao committed Oct 6, 2022
1 parent ee73039 commit 0b79ff0
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 78 deletions.
90 changes: 49 additions & 41 deletions pkg/database/database.go
Expand Up @@ -58,12 +58,16 @@ type AggregatedBucket struct {

// Database holds raw url of the postgresql database with the opened connection
type Database struct {
Url string
billingDate time.Time
URL string
BillingDate time.Time
connection *sqlx.DB
}

type transactionContext struct {
context.Context
billingDate time.Time
namespace *string
aggregatedBucket *AggregatedBucket
connection *sqlx.DB
transaction *sqlx.Tx
tenant *db.Tenant
category *db.Category
Expand All @@ -76,7 +80,7 @@ type Database struct {

// OpenConnection opens a connection to the postgres database
func (d *Database) OpenConnection() error {
connection, err := db.Openx(d.Url)
connection, err := db.Openx(d.URL)
if err != nil {
return fmt.Errorf("cannot create a connection to the database: %w", err)
}
Expand All @@ -96,104 +100,108 @@ func (d *Database) CloseConnection() error {
// EnsureBucketUsage saves the aggregated buckets usage by namespace to the postgresql database
// To save the correct data to the database the function also matches a relevant product, discount (if any) and query.
// The storage usage is referred to a day before the application ran (yesterday)
func (d *Database) EnsureBucketUsage(ctx context.Context, namespace string, aggregatedBucket AggregatedBucket, billingDate time.Time) error {
func (d *Database) EnsureBucketUsage(ctx context.Context, namespace string, aggregatedBucket AggregatedBucket) error {
log := ctrl.LoggerFrom(ctx)
log.Info("Saving buckets usage for namespace", "namespace", namespace, "storage used", aggregatedBucket.StorageUsed)

d.namespace = &namespace
d.billingDate = billingDate
d.aggregatedBucket = &aggregatedBucket
p := pipeline.NewPipeline[context.Context]()
tctx := &transactionContext{
Context: ctx,
namespace: &namespace,
aggregatedBucket: &aggregatedBucket,
billingDate: d.BillingDate,
}
p := pipeline.NewPipeline[*transactionContext]()
p.WithSteps(
p.NewStep("Begin database transaction", d.beginTransaction),
p.NewStep("Ensure necessary models", d.ensureModels),
p.NewStep("Get best match", d.getBestMatch),
p.NewStep("Save facts", d.saveFacts),
p.NewStep("Commit transaction", d.commitTransaction),
)
err := p.RunWithContext(ctx)
err := p.RunWithContext(tctx)
if err != nil {
d.transaction.Rollback()
return fmt.Errorf("could not save to database: %w", err)
log.Info("Buckets usage have not been saved to the database", "namespace", namespace, "error", err.Error())
tctx.transaction.Rollback()
return err
}
return nil
}

func (d *Database) beginTransaction(ctx context.Context) error {
func (d *Database) beginTransaction(ctx *transactionContext) error {
tx, err := d.connection.BeginTxx(ctx, &sql.TxOptions{})
if err != nil {
return fmt.Errorf("cannot create database transaction for namespace %s: %w", *d.namespace, err)
return fmt.Errorf("cannot create database transaction for namespace %s: %w", *ctx.namespace, err)
}
d.transaction = tx
ctx.transaction = tx
return nil
}

func (d *Database) ensureModels(ctx context.Context) error {
namespace := *d.namespace
tenant, err := tenantsmodel.Ensure(ctx, d.transaction, &db.Tenant{Source: d.aggregatedBucket.Organization})
func (d *Database) ensureModels(ctx *transactionContext) error {
namespace := *ctx.namespace
tenant, err := tenantsmodel.Ensure(ctx, ctx.transaction, &db.Tenant{Source: ctx.aggregatedBucket.Organization})
if err != nil {
return fmt.Errorf("cannot ensure organization for namespace %s: %w", namespace, err)
}
d.tenant = tenant
ctx.tenant = tenant

category, err := categoriesmodel.Ensure(ctx, d.transaction, &db.Category{Source: provider + ":" + namespace})
category, err := categoriesmodel.Ensure(ctx, ctx.transaction, &db.Category{Source: provider + ":" + namespace})
if err != nil {
return fmt.Errorf("cannot ensure category for namespace %s: %w", namespace, err)
}
d.category = category
ctx.category = category

dateTime := datetimesmodel.New(d.billingDate)
dateTime, err = datetimesmodel.Ensure(ctx, d.transaction, dateTime)
dateTime := datetimesmodel.New(ctx.billingDate)
dateTime, err = datetimesmodel.Ensure(ctx, ctx.transaction, dateTime)
if err != nil {
return fmt.Errorf("cannot ensure date time for namespace %s: %w", namespace, err)
}
d.dateTime = dateTime
ctx.dateTime = dateTime
return nil
}

func (d *Database) getBestMatch(ctx context.Context) error {
namespace := *d.namespace
productMatch, err := productsmodel.GetBestMatch(ctx, d.transaction, getSourceString(namespace, d.aggregatedBucket.Organization), d.billingDate)
func (d *Database) getBestMatch(ctx *transactionContext) error {
namespace := *ctx.namespace
productMatch, err := productsmodel.GetBestMatch(ctx, ctx.transaction, getSourceString(namespace, ctx.aggregatedBucket.Organization), ctx.billingDate)
if err != nil {
return fmt.Errorf("cannot get product best match for namespace %s: %w", namespace, err)
}
d.product = productMatch
ctx.product = productMatch

discountMatch, err := discountsmodel.GetBestMatch(ctx, d.transaction, getSourceString(namespace, d.aggregatedBucket.Organization), d.billingDate)
discountMatch, err := discountsmodel.GetBestMatch(ctx, ctx.transaction, getSourceString(namespace, ctx.aggregatedBucket.Organization), ctx.billingDate)
if err != nil {
return fmt.Errorf("cannot get discount best match for namespace %s: %w", namespace, err)
}
d.discount = discountMatch
ctx.discount = discountMatch

queryMatch, err := queriesmodel.GetByName(ctx, d.transaction, queryAndZone)
queryMatch, err := queriesmodel.GetByName(ctx, ctx.transaction, queryAndZone)
if err != nil {
return fmt.Errorf("cannot get query by name for namespace %s: %w", namespace, err)
}
d.query = queryMatch
ctx.query = queryMatch

var quantity float64
if query.Unit == defaultUnit {
quantity = d.aggregatedBucket.StorageUsed / 1000 / 1000 / 1000
quantity = ctx.aggregatedBucket.StorageUsed / 1000 / 1000 / 1000
} else {
return fmt.Errorf("unknown query unit %s", query.Unit)
}
d.quantity = &quantity
ctx.quantity = &quantity
return nil
}

func (d *Database) saveFacts(ctx context.Context) error {
storageFact := factsmodel.New(d.dateTime, d.query, d.tenant, d.category, d.product, d.discount, *d.quantity)
_, err := factsmodel.Ensure(ctx, d.transaction, storageFact)
func (d *Database) saveFacts(ctx *transactionContext) error {
storageFact := factsmodel.New(ctx.dateTime, ctx.query, ctx.tenant, ctx.category, ctx.product, ctx.discount, *ctx.quantity)
_, err := factsmodel.Ensure(ctx, ctx.transaction, storageFact)
if err != nil {
return fmt.Errorf("cannot save fact for namespace %s: %w", *d.namespace, err)
return fmt.Errorf("cannot save fact for namespace %s: %w", *ctx.namespace, err)
}
return nil
}

func (d *Database) commitTransaction(_ context.Context) error {
err := d.transaction.Commit()
func (d *Database) commitTransaction(ctx *transactionContext) error {
err := ctx.transaction.Commit()
if err != nil {
return fmt.Errorf("cannot commit transaction for buckets in namespace %s: %w", *d.namespace, err)
return fmt.Errorf("cannot commit transaction for buckets in namespace %s: %w", *ctx.namespace, err)
}
return nil
}
Expand Down
56 changes: 20 additions & 36 deletions pkg/sos/objectstorage.go
Expand Up @@ -22,13 +22,13 @@ const (
exoscaleBillingHour = 6
)

// ObjectStorage contains exoscale and k8s clients along with the database connection
type ObjectStorage struct {
k8sClient *k8s.Client
k8sClient k8s.Client
exoscaleClient *egoscale.Client
database *db.Database
bucketDetails []BucketDetail
aggregatedBuckets map[string]db.AggregatedBucket
billingDate time.Time
}

type BucketDetail struct {
Expand All @@ -38,8 +38,8 @@ type BucketDetail struct {
func NewObjectStorage(exoscaleClient *egoscale.Client, k8sClient *k8s.Client, databaseURL string) ObjectStorage {
return ObjectStorage{
exoscaleClient: exoscaleClient,
k8sClient: k8sClient,
database: &db.Database{Url: databaseURL},
k8sClient: *k8sClient,
database: &db.Database{URL: databaseURL},
}
}

Expand Down Expand Up @@ -82,9 +82,8 @@ func (o *ObjectStorage) fetchManagedBuckets(ctx context.Context) error {
log.Info("Fetching buckets from cluster")

buckets := exoscalev1.BucketList{}
client := *o.k8sClient
log.V(1).Info("Listing buckets from cluster")
err := client.List(ctx, &buckets)
err := o.k8sClient.List(ctx, &buckets)
if err != nil {
return fmt.Errorf("cannot list buckets: %w", err)
}
Expand All @@ -111,9 +110,9 @@ func (o *ObjectStorage) saveToDatabase(ctx context.Context) error {

log.V(1).Info("Saving buckets information usage to database")
for namespace, aggregatedBucket := range o.aggregatedBuckets {
err = o.database.EnsureBucketUsage(ctx, namespace, aggregatedBucket, o.billingDate)
err = o.database.EnsureBucketUsage(ctx, namespace, aggregatedBucket)
if err != nil {
log.Info("WARNING: Buckets usage have not been saved to the database", "namespace", namespace, "error", err.Error())
return err
}
}

Expand All @@ -127,44 +126,29 @@ func (o *ObjectStorage) getBillingDate(_ context.Context) error {
}
now := time.Now().In(location)
previousDay := now.Day() - 1
o.billingDate = time.Date(now.Year(), now.Month(), previousDay, exoscaleBillingHour, 0, 0, 0, now.Location())
o.database.BillingDate = time.Date(now.Year(), now.Month(), previousDay, exoscaleBillingHour, 0, 0, 0, now.Location())
return nil
}

func getAggregatedBuckets(ctx context.Context, sosBucketsUsage []oapi.SosBucketUsage, bucketDetails []BucketDetail) map[string]db.AggregatedBucket {
log := ctrl.LoggerFrom(ctx)
log.Info("Aggregating buckets by namespace")

sosBucketsUsageMap := make(map[string]oapi.SosBucketUsage, len(sosBucketsUsage))
for _, usage := range sosBucketsUsage {
sosBucketsUsageMap[*usage.Name] = usage
}

aggregatedBuckets := make(map[string]db.AggregatedBucket)
for _, bucketDetail := range bucketDetails {
log.V(1).Info("Checking bucket", "bucket", bucketDetail.BucketName)
storageUsed := float64(0)
usageBucketExists := false

// Match managed kubernetes resource bucket to the exoscale usage bucket to retrieve the amount of storage used
for _, bucketUsage := range sosBucketsUsage {
if bucketDetail.BucketName == *bucketUsage.Name {
log.V(1).Info("Found exoscale bucket usage", "bucket", bucketUsage.Name, "bucket size", bucketUsage.Name)
storageUsed = float64(*bucketUsage.Size)
usageBucketExists = true
break
}
}

// Create AggregatedBucket object only if the object exists in exoscale
if usageBucketExists {
aggregatedBucket, exists := aggregatedBuckets[bucketDetail.Namespace]
if exists {
log.V(1).Info("AggregatedBucket exists, adding its used storage to the current AggregatedBucket",
"bucket", bucketDetail.BucketName, "storage to be added", storageUsed)
aggregatedBucket.StorageUsed = aggregatedBucket.StorageUsed + storageUsed
aggregatedBuckets[bucketDetail.Namespace] = aggregatedBucket
} else {
log.V(1).Info("Adding new AggregatedBucket object", "bucket", bucketDetail.BucketName, "bucket storage", storageUsed)
aggregatedBuckets[bucketDetail.Namespace] = db.AggregatedBucket{
Organization: bucketDetail.Organization,
StorageUsed: storageUsed,
}
}
if bucketUsage, exists := sosBucketsUsageMap[bucketDetail.BucketName]; exists {
log.V(1).Info("Found exoscale bucket usage", "bucket", bucketUsage.Name, "bucket size", bucketUsage.Name)
aggregatedBucket := aggregatedBuckets[bucketDetail.Namespace]
aggregatedBucket.Organization = bucketDetail.Organization
aggregatedBucket.StorageUsed += float64(*bucketUsage.Size)
aggregatedBuckets[bucketDetail.Namespace] = aggregatedBucket
} else {
log.Info("Could not find any bucket on exoscale", "bucket", bucketDetail.BucketName)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sos/objectstorage_test.go
Expand Up @@ -99,7 +99,7 @@ func TestObjectStorage_GetAggregatedBuckets(t *testing.T) {
}
}

func TestObjectStorage_AadOrgAndNamespaceToBucket(t *testing.T) {
func TestObjectStorage_AddOrgAndNamespaceToBucket(t *testing.T) {
tests := map[string]struct {
givenBucketList exoscalev1.BucketList
expectedBucketDetails []BucketDetail
Expand Down

0 comments on commit 0b79ff0

Please sign in to comment.