Skip to content

Commit

Permalink
Add database support
Browse files Browse the repository at this point in the history
  • Loading branch information
zugao committed Sep 23, 2022
1 parent 5f02563 commit bc9c174
Show file tree
Hide file tree
Showing 7 changed files with 457 additions and 47 deletions.
14 changes: 14 additions & 0 deletions go.mod
Expand Up @@ -8,6 +8,7 @@ require (
github.com/go-logr/logr v1.2.3
github.com/go-logr/zapr v1.2.3
github.com/urfave/cli/v2 v2.16.3
github.com/vshn/cloudscale-metrics-collector v0.3.3
github.com/vshn/provider-exoscale v0.1.0
go.uber.org/zap v1.23.0
golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b
Expand All @@ -17,8 +18,10 @@ require (
)

require (
github.com/appuio/appuio-cloud-reporting v0.5.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cloudscale-ch/cloudscale-go-sdk/v2 v2.0.1 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/crossplane/crossplane-runtime v0.17.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -39,8 +42,18 @@ require (
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-retryablehttp v0.7.1 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.12.0 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.11.0 // indirect
github.com/jackc/pgx/v4 v4.16.0 // indirect
github.com/jmoiron/sqlx v1.3.5 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/lopezator/migrator v0.3.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand All @@ -60,6 +73,7 @@ require (
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/oauth2 v0.0.0-20220608161450-d0670ef3b1eb // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
Expand Down
124 changes: 123 additions & 1 deletion go.sum

Large diffs are not rendered by default.

24 changes: 16 additions & 8 deletions sos_command.go
Expand Up @@ -2,9 +2,11 @@ package main

import (
"github.com/urfave/cli/v2"
"github.com/vshn/exoscale-metrics-collector/src/exoscale"
"github.com/vshn/exoscale-metrics-collector/src/kubernetes"
"github.com/vshn/exoscale-metrics-collector/src/sos"

db "github.com/vshn/exoscale-metrics-collector/src/database"
"github.com/vshn/exoscale-metrics-collector/src/exoscale"
k8s "github.com/vshn/exoscale-metrics-collector/src/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
)

Expand Down Expand Up @@ -40,22 +42,28 @@ func (c *objectStorageCommand) execute(ctx *cli.Context) error {
return err
}
log.Info("Creating Exoscale client")
exoscaleClient, err := exoscale.CreateExoscaleClient(accessKey, secretKey)
exoscaleClient, err := exoscale.InitClient(accessKey, secretKey)
if err != nil {
return err
}

log.Info("Reading input cluster file configuration")
clusters, err := kubernetes.ReadConf(c.clusterFilePath)
clusters, err := k8s.ReadConf(c.clusterFilePath)
if err != nil {
return err
}
log.Info("Creating k8s clients")
err = kubernetes.InitKubernetesClients(ctx.Context, clusters)
err = k8s.InitClients(ctx.Context, clusters)
if err != nil {
return err
}

log.Info("Loading database URL")
databaseInstance, err := db.Configure()
if err != nil {
return err
}

o := sos.NewObjectStorage(exoscaleClient, clusters)
err = o.Execute(ctx.Context)
return err
o := sos.NewObjectStorage(exoscaleClient, clusters, databaseInstance)
return o.Execute(ctx.Context)
}
184 changes: 184 additions & 0 deletions src/database/database.go
@@ -0,0 +1,184 @@
package database

import (
"context"
"database/sql"
"fmt"
"github.com/appuio/appuio-cloud-reporting/pkg/db"
"github.com/jmoiron/sqlx"
"github.com/vshn/cloudscale-metrics-collector/pkg/categoriesmodel"
"github.com/vshn/cloudscale-metrics-collector/pkg/datetimesmodel"
"github.com/vshn/cloudscale-metrics-collector/pkg/discountsmodel"
"github.com/vshn/cloudscale-metrics-collector/pkg/factsmodel"
"github.com/vshn/cloudscale-metrics-collector/pkg/productsmodel"
"github.com/vshn/cloudscale-metrics-collector/pkg/queriesmodel"
"github.com/vshn/cloudscale-metrics-collector/pkg/tenantsmodel"
"os"
"strings"
"time"

ctrl "sigs.k8s.io/controller-runtime"
)

const (
dbUrlEnvVariable = "ACR_DB_URL"
sourceQueryStorage = "object-storage-storage"
provider = "exoscale"
queryAndZone = sourceQueryStorage + ":" + provider
defaultUnit = "GBDay"
)

var (
productRow = db.Product{
Source: queryAndZone,
Target: sql.NullString{String: "1402", Valid: true},
Amount: 0.00066,
Unit: "GBDay",
During: db.InfiniteRange(),
}
discountRow = db.Discount{
Source: sourceQueryStorage,
Discount: 0,
During: db.InfiniteRange(),
}
queryRow = db.Query{
Name: queryAndZone,
Description: "Object Storage - Storage (exoscale.com)",
Query: "",
Unit: "GBDay",
During: db.InfiniteRange(),
}
)

type AggregatedBucket struct {
Organization string
// Storage in bytes
StorageUsed int64
}

type Database struct {
Url string
connection *sqlx.DB
}

// Configure loads environment variable which holds the database URL
func Configure() (database *Database, err error) {
databaseUrl := os.Getenv(dbUrlEnvVariable)
if databaseUrl == "" {
return nil, fmt.Errorf("cannot find environment variable %s", dbUrlEnvVariable)
}
return &Database{Url: databaseUrl}, nil
}

// OpenConnection opens a connection to the postgres database
func (d *Database) OpenConnection() error {
connection, err := db.Openx(d.Url)
if err != nil {
return fmt.Errorf("cannot create a connection to the database: %w", err)
}
d.connection = connection
return nil
}

// CloseConnection closes the connection to the postgres database
func (d *Database) CloseConnection() error {
err := d.connection.Close()
if err != nil {
return fmt.Errorf("cannot close database connection: %w", err)
}
return err
}

// EnsureBucketUsage saves the aggregated buckets usage by namespace to the postgresql database
// To save the correct data to the database the function also matches a relevant product, discount (if any) and query.
// The storage usage is referred to a day before the application ran (yesterday)
func (d *Database) EnsureBucketUsage(ctx context.Context, namespace string, aggregatedBucket AggregatedBucket, billingDate time.Time) error {
log := ctrl.LoggerFrom(ctx)
log.Info("Saving buckets usage for namespace", "namespace", namespace, "storage used", aggregatedBucket.StorageUsed)

// start new transaction for actual work
tx, err := d.connection.BeginTxx(ctx, &sql.TxOptions{})
if err != nil {
return fmt.Errorf("cannot create database transaction for namespace %s: %w", namespace, err)
}

tenant, err := tenantsmodel.Ensure(ctx, tx, &db.Tenant{Source: aggregatedBucket.Organization})
if err != nil {
return fmt.Errorf("cannot ensure organization for namespace %s: %w", namespace, err)
}

category, err := categoriesmodel.Ensure(ctx, tx, &db.Category{Source: "exoscale" + ":" + namespace})
if err != nil {
return fmt.Errorf("cannot ensure category for namespace %s: %w", namespace, err)
}

dateTime := datetimesmodel.New(billingDate)
dateTime, err = datetimesmodel.Ensure(ctx, tx, dateTime)
if err != nil {
return fmt.Errorf("cannot ensure date time for namespace %s: %w", namespace, err)
}

product, err := productsmodel.GetBestMatch(ctx, tx, getSourceString(namespace, aggregatedBucket.Organization), billingDate)
if err != nil {
return fmt.Errorf("cannot get product best match for namespace %s: %w", namespace, err)
}

discount, err := discountsmodel.GetBestMatch(ctx, tx, getSourceString(namespace, aggregatedBucket.Organization), billingDate)
if err != nil {
return fmt.Errorf("cannot get discount best match for namespace %s: %w", namespace, err)
}

query, err := queriesmodel.GetByName(ctx, tx, queryAndZone)
if err != nil {
return fmt.Errorf("cannot get query by name for namespace %s: %w", namespace, err)
}

var quantity float64
if query.Unit == defaultUnit {
quantity = float64(aggregatedBucket.StorageUsed) / 1000 / 1000 / 1000
} else {
return fmt.Errorf("unknown query unit %s", query.Unit)
}
storageFact := factsmodel.New(dateTime, query, tenant, category, product, discount, quantity)
_, err = factsmodel.Ensure(ctx, tx, storageFact)
if err != nil {
return fmt.Errorf("cannot save fact for namespace %s: %w", namespace, err)
}

err = tx.Commit()
if err != nil {
return fmt.Errorf("cannot commit transaction for buckets in namespace %s: %w", namespace, err)
}
return nil
}

// EnsureInitConfiguration ensures the minimum exoscale object storage configuration data is present in the database
// before saving buckets usage
func (d *Database) EnsureInitConfiguration(ctx context.Context) error {
tx, err := d.connection.BeginTxx(ctx, &sql.TxOptions{})
if err != nil {
return fmt.Errorf("cannot begin transaction for initial database configuration: %w", err)
}
defer tx.Rollback()
_, err = productsmodel.Ensure(ctx, tx, &productRow)
if err != nil {
return fmt.Errorf("cannot ensure exoscale product model in the database: %w", err)
}
_, err = discountsmodel.Ensure(ctx, tx, &discountRow)
if err != nil {
return fmt.Errorf("cannot ensure exoscale discount model in the database: %w", err)
}
_, err = queriesmodel.Ensure(ctx, tx, &queryRow)
if err != nil {
return fmt.Errorf("cannot ensure exoscale query model in the database: %w", err)
}
err = tx.Commit()
if err != nil {
return fmt.Errorf("cannot commit transaction for initial database configuration: %w", err)
}
return nil
}

func getSourceString(namespace, organization string) string {
return strings.Join([]string{queryAndZone, organization, namespace}, ":")
}
7 changes: 4 additions & 3 deletions src/exoscale/exoscale.go
Expand Up @@ -2,8 +2,9 @@ package exoscale

import (
"fmt"
egoscale "github.com/exoscale/egoscale/v2"
"os"

egoscale "github.com/exoscale/egoscale/v2"
)

const (
Expand All @@ -27,8 +28,8 @@ func LoadAPICredentials() (exoscaleAccessKey, exoscaleSecretKey string, err erro
return APIKey, APISecret, nil
}

// CreateExoscaleClient creates exoscale client with given access and secret keys
func CreateExoscaleClient(exoscaleAccessKey, exoscaleSecretKey string) (*egoscale.Client, error) {
// InitClient creates exoscale client with given access and secret keys
func InitClient(exoscaleAccessKey, exoscaleSecretKey string) (*egoscale.Client, error) {
options := egoscale.ClientOptWithAPIEndpoint(sosEndpoint)
client, err := egoscale.NewClient(exoscaleAccessKey, exoscaleSecretKey, options)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions src/kubernetes/kubernetes.go
Expand Up @@ -8,8 +8,9 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

ctrl "sigs.k8s.io/controller-runtime"
)

type Cluster struct {
Expand All @@ -35,9 +36,9 @@ func ReadConf(filename string) (*[]Cluster, error) {
return &clusters, err
}

// InitKubernetesClients creates as many k8s clients as there are cluster entries from the parsed file
// InitClients creates as many k8s clients as there are cluster entries from the parsed file
// The function skips clients that cannot be initialized
func InitKubernetesClients(ctx context.Context, clusters *[]Cluster) error {
func InitClients(ctx context.Context, clusters *[]Cluster) error {
log := ctrl.LoggerFrom(ctx)
scheme := runtime.NewScheme()
err := apis.AddToScheme(scheme)
Expand Down

0 comments on commit bc9c174

Please sign in to comment.