Skip to content

Commit

Permalink
changed code after review
Browse files Browse the repository at this point in the history
  • Loading branch information
zugao committed Oct 6, 2022
1 parent ee73039 commit 96c0a4e
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 84 deletions.
90 changes: 49 additions & 41 deletions pkg/database/database.go
Expand Up @@ -58,12 +58,16 @@ type AggregatedBucket struct {

// Database holds raw url of the postgresql database with the opened connection
type Database struct {
Url string
billingDate time.Time
URL string
BillingDate time.Time
connection *sqlx.DB
}

type transactionContext struct {
context.Context
billingDate time.Time
namespace *string
aggregatedBucket *AggregatedBucket
connection *sqlx.DB
transaction *sqlx.Tx
tenant *db.Tenant
category *db.Category
Expand All @@ -76,7 +80,7 @@ type Database struct {

// OpenConnection opens a connection to the postgres database
func (d *Database) OpenConnection() error {
connection, err := db.Openx(d.Url)
connection, err := db.Openx(d.URL)
if err != nil {
return fmt.Errorf("cannot create a connection to the database: %w", err)
}
Expand All @@ -96,104 +100,108 @@ func (d *Database) CloseConnection() error {
// EnsureBucketUsage saves the aggregated buckets usage by namespace to the postgresql database
// To save the correct data to the database the function also matches a relevant product, discount (if any) and query.
// The storage usage is referred to a day before the application ran (yesterday)
func (d *Database) EnsureBucketUsage(ctx context.Context, namespace string, aggregatedBucket AggregatedBucket, billingDate time.Time) error {
func (d *Database) EnsureBucketUsage(ctx context.Context, namespace string, aggregatedBucket AggregatedBucket) error {
log := ctrl.LoggerFrom(ctx)
log.Info("Saving buckets usage for namespace", "namespace", namespace, "storage used", aggregatedBucket.StorageUsed)

d.namespace = &namespace
d.billingDate = billingDate
d.aggregatedBucket = &aggregatedBucket
p := pipeline.NewPipeline[context.Context]()
tctx := &transactionContext{
Context: ctx,
namespace: &namespace,
aggregatedBucket: &aggregatedBucket,
billingDate: d.BillingDate,
}
p := pipeline.NewPipeline[*transactionContext]()
p.WithSteps(
p.NewStep("Begin database transaction", d.beginTransaction),
p.NewStep("Ensure necessary models", d.ensureModels),
p.NewStep("Get best match", d.getBestMatch),
p.NewStep("Save facts", d.saveFacts),
p.NewStep("Commit transaction", d.commitTransaction),
)
err := p.RunWithContext(ctx)
err := p.RunWithContext(tctx)
if err != nil {
d.transaction.Rollback()
return fmt.Errorf("could not save to database: %w", err)
log.Info("Buckets usage have not been saved to the database", "namespace", namespace, "error", err.Error())
tctx.transaction.Rollback()
return err
}
return nil
}

func (d *Database) beginTransaction(ctx context.Context) error {
func (d *Database) beginTransaction(ctx *transactionContext) error {
tx, err := d.connection.BeginTxx(ctx, &sql.TxOptions{})
if err != nil {
return fmt.Errorf("cannot create database transaction for namespace %s: %w", *d.namespace, err)
return fmt.Errorf("cannot create database transaction for namespace %s: %w", *ctx.namespace, err)
}
d.transaction = tx
ctx.transaction = tx
return nil
}

func (d *Database) ensureModels(ctx context.Context) error {
namespace := *d.namespace
tenant, err := tenantsmodel.Ensure(ctx, d.transaction, &db.Tenant{Source: d.aggregatedBucket.Organization})
func (d *Database) ensureModels(ctx *transactionContext) error {
namespace := *ctx.namespace
tenant, err := tenantsmodel.Ensure(ctx, ctx.transaction, &db.Tenant{Source: ctx.aggregatedBucket.Organization})
if err != nil {
return fmt.Errorf("cannot ensure organization for namespace %s: %w", namespace, err)
}
d.tenant = tenant
ctx.tenant = tenant

category, err := categoriesmodel.Ensure(ctx, d.transaction, &db.Category{Source: provider + ":" + namespace})
category, err := categoriesmodel.Ensure(ctx, ctx.transaction, &db.Category{Source: provider + ":" + namespace})
if err != nil {
return fmt.Errorf("cannot ensure category for namespace %s: %w", namespace, err)
}
d.category = category
ctx.category = category

dateTime := datetimesmodel.New(d.billingDate)
dateTime, err = datetimesmodel.Ensure(ctx, d.transaction, dateTime)
dateTime := datetimesmodel.New(ctx.billingDate)
dateTime, err = datetimesmodel.Ensure(ctx, ctx.transaction, dateTime)
if err != nil {
return fmt.Errorf("cannot ensure date time for namespace %s: %w", namespace, err)
}
d.dateTime = dateTime
ctx.dateTime = dateTime
return nil
}

func (d *Database) getBestMatch(ctx context.Context) error {
namespace := *d.namespace
productMatch, err := productsmodel.GetBestMatch(ctx, d.transaction, getSourceString(namespace, d.aggregatedBucket.Organization), d.billingDate)
func (d *Database) getBestMatch(ctx *transactionContext) error {
namespace := *ctx.namespace
productMatch, err := productsmodel.GetBestMatch(ctx, ctx.transaction, getSourceString(namespace, ctx.aggregatedBucket.Organization), ctx.billingDate)
if err != nil {
return fmt.Errorf("cannot get product best match for namespace %s: %w", namespace, err)
}
d.product = productMatch
ctx.product = productMatch

discountMatch, err := discountsmodel.GetBestMatch(ctx, d.transaction, getSourceString(namespace, d.aggregatedBucket.Organization), d.billingDate)
discountMatch, err := discountsmodel.GetBestMatch(ctx, ctx.transaction, getSourceString(namespace, ctx.aggregatedBucket.Organization), ctx.billingDate)
if err != nil {
return fmt.Errorf("cannot get discount best match for namespace %s: %w", namespace, err)
}
d.discount = discountMatch
ctx.discount = discountMatch

queryMatch, err := queriesmodel.GetByName(ctx, d.transaction, queryAndZone)
queryMatch, err := queriesmodel.GetByName(ctx, ctx.transaction, queryAndZone)
if err != nil {
return fmt.Errorf("cannot get query by name for namespace %s: %w", namespace, err)
}
d.query = queryMatch
ctx.query = queryMatch

var quantity float64
if query.Unit == defaultUnit {
quantity = d.aggregatedBucket.StorageUsed / 1000 / 1000 / 1000
quantity = ctx.aggregatedBucket.StorageUsed / 1000 / 1000 / 1000
} else {
return fmt.Errorf("unknown query unit %s", query.Unit)
}
d.quantity = &quantity
ctx.quantity = &quantity
return nil
}

func (d *Database) saveFacts(ctx context.Context) error {
storageFact := factsmodel.New(d.dateTime, d.query, d.tenant, d.category, d.product, d.discount, *d.quantity)
_, err := factsmodel.Ensure(ctx, d.transaction, storageFact)
func (d *Database) saveFacts(ctx *transactionContext) error {
storageFact := factsmodel.New(ctx.dateTime, ctx.query, ctx.tenant, ctx.category, ctx.product, ctx.discount, *ctx.quantity)
_, err := factsmodel.Ensure(ctx, ctx.transaction, storageFact)
if err != nil {
return fmt.Errorf("cannot save fact for namespace %s: %w", *d.namespace, err)
return fmt.Errorf("cannot save fact for namespace %s: %w", *ctx.namespace, err)
}
return nil
}

func (d *Database) commitTransaction(_ context.Context) error {
err := d.transaction.Commit()
func (d *Database) commitTransaction(ctx *transactionContext) error {
err := ctx.transaction.Commit()
if err != nil {
return fmt.Errorf("cannot commit transaction for buckets in namespace %s: %w", *d.namespace, err)
return fmt.Errorf("cannot commit transaction for buckets in namespace %s: %w", *ctx.namespace, err)
}
return nil
}
Expand Down
59 changes: 23 additions & 36 deletions pkg/sos/objectstorage.go
Expand Up @@ -22,27 +22,30 @@ const (
exoscaleBillingHour = 6
)

// ObjectStorage gathers bucket data from exoscale provider and cluster and saves to the database
type ObjectStorage struct {
k8sClient *k8s.Client
k8sClient k8s.Client
exoscaleClient *egoscale.Client
database *db.Database
bucketDetails []BucketDetail
aggregatedBuckets map[string]db.AggregatedBucket
billingDate time.Time
}

// BucketDetail a k8s bucket object with relevant data
type BucketDetail struct {
Organization, BucketName, Namespace string
}

//NewObjectStorage creates an ObjectStorage with the initial setup
func NewObjectStorage(exoscaleClient *egoscale.Client, k8sClient *k8s.Client, databaseURL string) ObjectStorage {
return ObjectStorage{
exoscaleClient: exoscaleClient,
k8sClient: k8sClient,
database: &db.Database{Url: databaseURL},
k8sClient: *k8sClient,
database: &db.Database{URL: databaseURL},
}
}

// Execute executes the main business logic for this application by gathering, matching and saving data to the database
func (o *ObjectStorage) Execute(ctx context.Context) error {
log := ctrl.LoggerFrom(ctx)
log.Info("Running metrics collector by step")
Expand Down Expand Up @@ -82,9 +85,8 @@ func (o *ObjectStorage) fetchManagedBuckets(ctx context.Context) error {
log.Info("Fetching buckets from cluster")

buckets := exoscalev1.BucketList{}
client := *o.k8sClient
log.V(1).Info("Listing buckets from cluster")
err := client.List(ctx, &buckets)
err := o.k8sClient.List(ctx, &buckets)
if err != nil {
return fmt.Errorf("cannot list buckets: %w", err)
}
Expand All @@ -111,9 +113,9 @@ func (o *ObjectStorage) saveToDatabase(ctx context.Context) error {

log.V(1).Info("Saving buckets information usage to database")
for namespace, aggregatedBucket := range o.aggregatedBuckets {
err = o.database.EnsureBucketUsage(ctx, namespace, aggregatedBucket, o.billingDate)
err = o.database.EnsureBucketUsage(ctx, namespace, aggregatedBucket)
if err != nil {
log.Info("WARNING: Buckets usage have not been saved to the database", "namespace", namespace, "error", err.Error())
return err
}
}

Expand All @@ -127,44 +129,29 @@ func (o *ObjectStorage) getBillingDate(_ context.Context) error {
}
now := time.Now().In(location)
previousDay := now.Day() - 1
o.billingDate = time.Date(now.Year(), now.Month(), previousDay, exoscaleBillingHour, 0, 0, 0, now.Location())
o.database.BillingDate = time.Date(now.Year(), now.Month(), previousDay, exoscaleBillingHour, 0, 0, 0, now.Location())
return nil
}

func getAggregatedBuckets(ctx context.Context, sosBucketsUsage []oapi.SosBucketUsage, bucketDetails []BucketDetail) map[string]db.AggregatedBucket {
log := ctrl.LoggerFrom(ctx)
log.Info("Aggregating buckets by namespace")

sosBucketsUsageMap := make(map[string]oapi.SosBucketUsage, len(sosBucketsUsage))
for _, usage := range sosBucketsUsage {
sosBucketsUsageMap[*usage.Name] = usage
}

aggregatedBuckets := make(map[string]db.AggregatedBucket)
for _, bucketDetail := range bucketDetails {
log.V(1).Info("Checking bucket", "bucket", bucketDetail.BucketName)
storageUsed := float64(0)
usageBucketExists := false

// Match managed kubernetes resource bucket to the exoscale usage bucket to retrieve the amount of storage used
for _, bucketUsage := range sosBucketsUsage {
if bucketDetail.BucketName == *bucketUsage.Name {
log.V(1).Info("Found exoscale bucket usage", "bucket", bucketUsage.Name, "bucket size", bucketUsage.Name)
storageUsed = float64(*bucketUsage.Size)
usageBucketExists = true
break
}
}

// Create AggregatedBucket object only if the object exists in exoscale
if usageBucketExists {
aggregatedBucket, exists := aggregatedBuckets[bucketDetail.Namespace]
if exists {
log.V(1).Info("AggregatedBucket exists, adding its used storage to the current AggregatedBucket",
"bucket", bucketDetail.BucketName, "storage to be added", storageUsed)
aggregatedBucket.StorageUsed = aggregatedBucket.StorageUsed + storageUsed
aggregatedBuckets[bucketDetail.Namespace] = aggregatedBucket
} else {
log.V(1).Info("Adding new AggregatedBucket object", "bucket", bucketDetail.BucketName, "bucket storage", storageUsed)
aggregatedBuckets[bucketDetail.Namespace] = db.AggregatedBucket{
Organization: bucketDetail.Organization,
StorageUsed: storageUsed,
}
}
if bucketUsage, exists := sosBucketsUsageMap[bucketDetail.BucketName]; exists {
log.V(1).Info("Found exoscale bucket usage", "bucket", bucketUsage.Name, "bucket size", bucketUsage.Name)
aggregatedBucket := aggregatedBuckets[bucketDetail.Namespace]
aggregatedBucket.Organization = bucketDetail.Organization
aggregatedBucket.StorageUsed += float64(*bucketUsage.Size)
aggregatedBuckets[bucketDetail.Namespace] = aggregatedBucket
} else {
log.Info("Could not find any bucket on exoscale", "bucket", bucketDetail.BucketName)
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/sos/objectstorage_test.go
Expand Up @@ -20,12 +20,14 @@ func TestObjectStorage_GetBillingDate(t *testing.T) {
expected := time.Date(now.Year(), now.Month(), time.Now().Day()-1, 6, 0, 0, 0, now.Location())

//When
o := ObjectStorage{}
o := ObjectStorage{
database: &db.Database{},
}
err := o.getBillingDate(ctx)

// Then
assert.NoError(t, err)
assert.Equal(t, o.billingDate, expected)
assert.Equal(t, o.database.BillingDate, expected)
})
}

Expand Down Expand Up @@ -99,7 +101,7 @@ func TestObjectStorage_GetAggregatedBuckets(t *testing.T) {
}
}

func TestObjectStorage_AadOrgAndNamespaceToBucket(t *testing.T) {
func TestObjectStorage_AddOrgAndNamespaceToBucket(t *testing.T) {
tests := map[string]struct {
givenBucketList exoscalev1.BucketList
expectedBucketDetails []BucketDetail
Expand Down
8 changes: 4 additions & 4 deletions sos_command.go
Expand Up @@ -12,8 +12,8 @@ const (
objectStorageName = "objectstorage"
keyEnvVariable = "EXOSCALE_API_KEY"
secretEnvVariable = "EXOSCALE_API_SECRET"
dbUrlEnvVariable = "ACR_DB_URL"
k8sServerUrlEnvVariable = "K8S_SERVER_URL"
dbURLEnvVariable = "ACR_DB_URL"
k8sServerURLEnvVariable = "K8S_SERVER_URL"
k8sTokenEnvVariable = "K8S_TOKEN"
)

Expand All @@ -35,7 +35,7 @@ func NewCommand() *cli.Command {
&cli.StringFlag{
Name: "k8s-server-url",
Aliases: []string{"u"},
EnvVars: []string{k8sServerUrlEnvVariable},
EnvVars: []string{k8sServerURLEnvVariable},
Required: true,
Usage: "A Kubernetes server URL from where to get the data from",
Destination: &command.clusterURL,
Expand All @@ -51,7 +51,7 @@ func NewCommand() *cli.Command {
&cli.StringFlag{
Name: "database-url",
Aliases: []string{"d"},
EnvVars: []string{dbUrlEnvVariable},
EnvVars: []string{dbURLEnvVariable},
Required: true,
Usage: "A PostgreSQL database URL where to save relevant metrics",
Destination: &command.databaseURL,
Expand Down

0 comments on commit 96c0a4e

Please sign in to comment.