Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Floating sync intervals #9

Merged
merged 15 commits into from Nov 12, 2021
Merged
4 changes: 3 additions & 1 deletion go.mod
Expand Up @@ -3,11 +3,13 @@ module github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge
go 1.16

require (
github.com/cenkalti/backoff/v4 v4.1.1
github.com/go-logr/logr v0.2.1
github.com/go-logr/zapr v0.2.0 // indirect
github.com/jackc/pgx/v4 v4.11.0
github.com/open-cluster-management/governance-policy-propagator v0.0.0-20210520203318-a78632de1e26
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.0
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.1-0.20210913120916-7dc589acefb4
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.1-0.20210913120916-7dc589acefb4
github.com/open-horizon/edge-sync-service-client v0.0.0-20190711093406-dc3a19905da2
github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152 // indirect
github.com/operator-framework/operator-sdk v0.19.4
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Expand Up @@ -147,7 +147,10 @@ github.com/bugsnag/panicwrap v1.2.0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywR
github.com/campoy/embedmd v1.0.0/go.mod h1:oxyr9RCiSXg0M3VJ3ks0UGfp98BpSSGr0kpiX3MzVl8=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v0.0.0-20181003080854-62661b46c409/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ=
github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v0.0.0-20181017004759-096ff4a8a059/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
Expand Down Expand Up @@ -771,6 +774,12 @@ github.com/open-cluster-management/governance-policy-propagator v0.0.0-202105202
github.com/open-cluster-management/governance-policy-propagator v0.0.0-20210520203318-a78632de1e26/go.mod h1:7gzDyC7PddcuInXS+kLrfYmto/ImSORibXwy8RGjNM8=
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.0 h1:w7DbcYX27W6qCnumAj0rQbvsLVwFY0ggy6ZXpbqPsus=
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.0/go.mod h1:OOI0mxL5BOSq0TN6dVBfHiA6PUeapn+sMBXm9fLNaw8=
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.1-0.20210913120916-7dc589acefb4 h1:UueRf7YhBhHhWgop66CQxosFSQ/cUgNuNSOg8UPfOA0=
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.1-0.20210913120916-7dc589acefb4/go.mod h1:Th2t5lukBfvPb5tvNCMROedinKR4s64sK5hgMt8Er7w=
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.0 h1:qgTStCi+y6/nkFRF+cXHGLK50mGxA7YXB2Ey8BTsmyc=
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.0/go.mod h1:xpGxw+0si9vuDABvcJ9FxjznPw/uAbICzhaX+9JLLho=
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.1-0.20210913120916-7dc589acefb4 h1:BUTr6y9DEgHVT6pVDXfKYfw3pTTJ9aZP/MVbMDUXOOA=
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.1-0.20210913120916-7dc589acefb4/go.mod h1:xpGxw+0si9vuDABvcJ9FxjznPw/uAbICzhaX+9JLLho=
github.com/open-horizon/edge-sync-service-client v0.0.0-20190711093406-dc3a19905da2 h1:ZmX/tuCgFVbe07pqObNWf14cAYpHN8gVRSkWnbtSUW4=
github.com/open-horizon/edge-sync-service-client v0.0.0-20190711093406-dc3a19905da2/go.mod h1:rKW01VtqwtwaUQ+zkJ80X5EeGqsTblQE4J9HxPUtTAQ=
github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152 h1:YEvNOMo3ANOQ3AwsU0cCcBA4nKHDLUlyUCRWk5rBf68=
Expand Down
73 changes: 54 additions & 19 deletions pkg/controller/dbsyncer/generic_db_to_transport_syncer.go
Expand Up @@ -10,6 +10,7 @@ import (
datatypes "github.com/open-cluster-management/hub-of-hubs-data-types"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle"
hohDb "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport"
)

Expand All @@ -23,31 +24,25 @@ type genericDBToTransportSyncer struct {
dbTableName string
transport transport.Transport
transportBundleKey string
syncInterval time.Duration
lastUpdateTimestamp *time.Time
createObjFunc bundle.CreateObjectFunction
createBundleFunc bundle.CreateBundleFunction
intervalPolicy intervalpolicy.IntervalPolicy
}

func (syncer *genericDBToTransportSyncer) Start(stopChannel <-chan struct{}) error {
ctx, cancelContext := context.WithCancel(context.Background())
defer cancelContext()

syncer.init(ctx)
ticker := time.NewTicker(syncer.syncInterval)

for {
select {
case <-stopChannel:
ticker.Stop()
cancelContext()
syncer.log.Info("stopped syncer", "table", syncer.dbTableName)
go syncer.periodicSync(ctx)

return nil
case <-ticker.C:
go syncer.syncBundle(ctx)
}
}
<-stopChannel // blocking wait for stop event
vMaroon marked this conversation as resolved.
Show resolved Hide resolved
cancelContext()
syncer.log.Info("stopped syncer", "table", syncer.dbTableName)

return nil
}

func (syncer *genericDBToTransportSyncer) init(ctx context.Context) {
Expand All @@ -56,13 +51,14 @@ func (syncer *genericDBToTransportSyncer) init(ctx context.Context) {
// later, in SyncBundle, it will check the db if there are newer updates and if yes it will send it with
// transport layer and update the lastUpdateTimestamp field accordingly.
timestamp := syncer.initLastUpdateTimestampFromTransport()

if timestamp != nil {
syncer.lastUpdateTimestamp = timestamp
} else {
syncer.lastUpdateTimestamp = &time.Time{}
}

syncer.log.Info("initialzed syncer", "table", fmt.Sprintf("spec.%s", syncer.dbTableName))
syncer.log.Info("initialized syncer", "table", fmt.Sprintf("spec.%s", syncer.dbTableName))
syncer.syncBundle(ctx)
}

Expand All @@ -80,29 +76,68 @@ func (syncer *genericDBToTransportSyncer) initLastUpdateTimestampFromTransport()
return &timestamp
}

func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) {
// syncBundle performs the actual sync logic and returns true if bundle was committed to transport, otherwise false.
func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) bool {
lastUpdateTimestamp, err := syncer.db.GetLastUpdateTimestamp(ctx, syncer.dbTableName)
if err != nil {
syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName)
return

return false
}

if !lastUpdateTimestamp.After(*syncer.lastUpdateTimestamp) { // sync only if something has changed
return
return false
}

// if we got here, then the last update timestamp from db is after what we have in memory.
// this means something has changed in db, syncing to transport.
bundleResult := syncer.createBundleFunc()
lastUpdateTimestamp, err = syncer.db.GetBundle(ctx, syncer.dbTableName, syncer.createObjFunc, bundleResult)

if err != nil {
syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName)
return

return false
}

syncer.lastUpdateTimestamp = lastUpdateTimestamp

syncer.syncToTransport(syncer.transportBundleKey, datatypes.SpecBundle, lastUpdateTimestamp, bundleResult)

return true
}

func (syncer *genericDBToTransportSyncer) periodicSync(ctx context.Context) {
ticker := time.NewTicker(syncer.intervalPolicy.GetInterval())

for {
select {
case <-ctx.Done(): // we have received a signal to stop
ticker.Stop()
return

case <-ticker.C:
synced := syncer.syncBundle(ctx)

// get current sync interval
currentInterval := syncer.intervalPolicy.GetInterval()

// notify policy whether sync was actually performed or skipped
if synced {
syncer.intervalPolicy.Evaluate()
} else {
syncer.intervalPolicy.Reset()
}

// get reevaluated sync interval
reevaluatedInterval := syncer.intervalPolicy.GetInterval()

// reset ticker if needed
if currentInterval != reevaluatedInterval {
ticker.Reset(reevaluatedInterval)
syncer.log.Info(fmt.Sprintf("sync interval has been reset to %s", reevaluatedInterval.String()))
}
}
vMaroon marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (syncer *genericDBToTransportSyncer) syncToTransport(id string, objType string, timestamp *time.Time,
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/dbsyncer/hoh_config_db_to_transport_syncer.go
Expand Up @@ -7,6 +7,7 @@ import (
configv1 "github.com/open-cluster-management/hub-of-hubs-data-types/apis/config/v1"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -26,9 +27,9 @@ func AddHoHConfigDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, tr
dbTableName: configTableName,
transport: transport,
transportBundleKey: configsMsgKey,
syncInterval: syncInterval,
createObjFunc: func() metav1.Object { return &configv1.Config{} },
createBundleFunc: bundle.NewBaseBundle,
intervalPolicy: intervalpolicy.NewExponentialBackoffIntervalPolicy(syncInterval),
}); err != nil {
return fmt.Errorf("failed to add db to transport syncer - %w", err)
}
Expand Down
Expand Up @@ -7,6 +7,7 @@ import (
policiesv1 "github.com/open-cluster-management/governance-policy-propagator/pkg/apis/policy/v1"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -21,14 +22,14 @@ const (
func AddPlacementBindingsDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, transport transport.Transport,
syncInterval time.Duration) error {
if err := mgr.Add(&genericDBToTransportSyncer{
log: ctrl.Log.WithName("policy-db-to-transport-syncer"),
log: ctrl.Log.WithName("placement-bindings-db-to-transport-syncer"),
db: db,
dbTableName: placementBindingsTableName,
transport: transport,
transportBundleKey: placementBindingsMsgKey,
syncInterval: syncInterval,
createObjFunc: func() metav1.Object { return &policiesv1.PlacementBinding{} },
createBundleFunc: bundle.NewPlacementBindingBundle,
intervalPolicy: intervalpolicy.NewExponentialBackoffIntervalPolicy(syncInterval),
}); err != nil {
return fmt.Errorf("failed to add db to transport syncer - %w", err)
}
Expand Down
Expand Up @@ -7,6 +7,7 @@ import (
appsv1 "github.com/open-cluster-management/governance-policy-propagator/pkg/apis/apps/v1"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -26,9 +27,9 @@ func AddPlacementRulesDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecD
dbTableName: placementRulesTableName,
transport: transport,
transportBundleKey: placementRulesMsgKey,
syncInterval: syncInterval,
createObjFunc: func() metav1.Object { return &appsv1.PlacementRule{} },
createBundleFunc: bundle.NewBaseBundle,
intervalPolicy: intervalpolicy.NewExponentialBackoffIntervalPolicy(syncInterval),
}); err != nil {
return fmt.Errorf("failed to add db to transport syncer - %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/dbsyncer/policies_db_to_transport_syncer.go
Expand Up @@ -7,6 +7,7 @@ import (
policiesv1 "github.com/open-cluster-management/governance-policy-propagator/pkg/apis/policy/v1"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -26,9 +27,9 @@ func AddPoliciesDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, tra
dbTableName: policiesTableName,
transport: transport,
transportBundleKey: policiesMsgKey,
syncInterval: syncInterval,
createObjFunc: func() metav1.Object { return &policiesv1.Policy{} },
createBundleFunc: bundle.NewBaseBundle,
intervalPolicy: intervalpolicy.NewExponentialBackoffIntervalPolicy(syncInterval),
}); err != nil {
return fmt.Errorf("failed to add db to transport syncer - %w", err)
}
Expand Down
38 changes: 38 additions & 0 deletions pkg/intervalpolicy/exponential_backoff_interval_policy.go
@@ -0,0 +1,38 @@
package intervalpolicy

import (
"time"

"github.com/cenkalti/backoff/v4"
)

// exponentialBackoffIntervalPolicy is a default interval policy.
type exponentialBackoffIntervalPolicy struct {
exponentialBackoff *backoff.ExponentialBackOff
interval time.Duration
}

// NewExponentialBackoffIntervalPolicy creates new exponential backoff interval policy.
func NewExponentialBackoffIntervalPolicy(interval time.Duration) IntervalPolicy {
intervalPolicy := &exponentialBackoffIntervalPolicy{exponentialBackoff: backoff.NewExponentialBackOff()}
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved

intervalPolicy.exponentialBackoff.InitialInterval = interval
intervalPolicy.interval = interval

return intervalPolicy
}

// Evaluate reevaluates interval.
func (policy *exponentialBackoffIntervalPolicy) Evaluate() {
policy.interval = policy.exponentialBackoff.NextBackOff()
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved
}

// Reset resets the entire state of the policy.
func (policy *exponentialBackoffIntervalPolicy) Reset() {
policy.exponentialBackoff.Reset()
}
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved

// GetInterval returns reevaluated interval.
func (policy *exponentialBackoffIntervalPolicy) GetInterval() time.Duration {
return policy.interval
}
10 changes: 10 additions & 0 deletions pkg/intervalpolicy/interval_policy.go
@@ -0,0 +1,10 @@
package intervalpolicy

import "time"

// IntervalPolicy defines a policy to return interval based on the received events.
type IntervalPolicy interface {
Evaluate()
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved
Reset()
GetInterval() time.Duration
}