Skip to content

Commit

Permalink
Floating sync intervals (#9)
Browse files Browse the repository at this point in the history
* * first commit

* * 'continue' fix

* * fixed ticker's interval adjustment

* * rename method

* * rename method

* * PR review fixes

* * PR review fixes

* * PR review fixes

* * PR review fixes

* * PR review fixes

* * PR review fixes

* * PR review fixes

* * use backoff v4.1.2

* * use backoff v4.1.3 with specific commit id to include Nir R changes

* * use backoff v4.1.3 with specific commit id to include Nir R changes
  • Loading branch information
OlegSternbergIBM committed Nov 12, 2021
1 parent d915ca9 commit f7c4315
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 25 deletions.
4 changes: 3 additions & 1 deletion go.mod
Expand Up @@ -3,11 +3,13 @@ module github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge
go 1.16

require (
github.com/cenkalti/backoff/v4 v4.1.3-0.20211111164109-6b0e4ad0cd65
github.com/go-logr/logr v0.2.1
github.com/go-logr/zapr v0.2.0 // indirect
github.com/jackc/pgx/v4 v4.11.0
github.com/open-cluster-management/governance-policy-propagator v0.0.0-20210520203318-a78632de1e26
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.0
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.1-0.20210913120916-7dc589acefb4
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.1-0.20210913120916-7dc589acefb4
github.com/open-horizon/edge-sync-service-client v0.0.0-20190711093406-dc3a19905da2
github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152 // indirect
github.com/operator-framework/operator-sdk v0.19.4
Expand Down
13 changes: 13 additions & 0 deletions go.sum
Expand Up @@ -147,7 +147,14 @@ github.com/bugsnag/panicwrap v1.2.0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywR
github.com/campoy/embedmd v1.0.0/go.mod h1:oxyr9RCiSXg0M3VJ3ks0UGfp98BpSSGr0kpiX3MzVl8=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v0.0.0-20181003080854-62661b46c409/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ=
github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo=
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/cenkalti/backoff/v4 v4.1.3-0.20211111164109-6b0e4ad0cd65 h1:b1xN+oryGwKBsSdQo2wizfReaiLRhZRzb6BtjLZB/P0=
github.com/cenkalti/backoff/v4 v4.1.3-0.20211111164109-6b0e4ad0cd65/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v0.0.0-20181017004759-096ff4a8a059/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
Expand Down Expand Up @@ -771,6 +778,12 @@ github.com/open-cluster-management/governance-policy-propagator v0.0.0-202105202
github.com/open-cluster-management/governance-policy-propagator v0.0.0-20210520203318-a78632de1e26/go.mod h1:7gzDyC7PddcuInXS+kLrfYmto/ImSORibXwy8RGjNM8=
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.0 h1:w7DbcYX27W6qCnumAj0rQbvsLVwFY0ggy6ZXpbqPsus=
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.0/go.mod h1:OOI0mxL5BOSq0TN6dVBfHiA6PUeapn+sMBXm9fLNaw8=
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.1-0.20210913120916-7dc589acefb4 h1:UueRf7YhBhHhWgop66CQxosFSQ/cUgNuNSOg8UPfOA0=
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.1-0.20210913120916-7dc589acefb4/go.mod h1:Th2t5lukBfvPb5tvNCMROedinKR4s64sK5hgMt8Er7w=
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.0 h1:qgTStCi+y6/nkFRF+cXHGLK50mGxA7YXB2Ey8BTsmyc=
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.0/go.mod h1:xpGxw+0si9vuDABvcJ9FxjznPw/uAbICzhaX+9JLLho=
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.1-0.20210913120916-7dc589acefb4 h1:BUTr6y9DEgHVT6pVDXfKYfw3pTTJ9aZP/MVbMDUXOOA=
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.1-0.20210913120916-7dc589acefb4/go.mod h1:xpGxw+0si9vuDABvcJ9FxjznPw/uAbICzhaX+9JLLho=
github.com/open-horizon/edge-sync-service-client v0.0.0-20190711093406-dc3a19905da2 h1:ZmX/tuCgFVbe07pqObNWf14cAYpHN8gVRSkWnbtSUW4=
github.com/open-horizon/edge-sync-service-client v0.0.0-20190711093406-dc3a19905da2/go.mod h1:rKW01VtqwtwaUQ+zkJ80X5EeGqsTblQE4J9HxPUtTAQ=
github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152 h1:YEvNOMo3ANOQ3AwsU0cCcBA4nKHDLUlyUCRWk5rBf68=
Expand Down
73 changes: 54 additions & 19 deletions pkg/controller/dbsyncer/generic_db_to_transport_syncer.go
Expand Up @@ -10,6 +10,7 @@ import (
datatypes "github.com/open-cluster-management/hub-of-hubs-data-types"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle"
hohDb "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport"
)

Expand All @@ -23,31 +24,25 @@ type genericDBToTransportSyncer struct {
dbTableName string
transport transport.Transport
transportBundleKey string
syncInterval time.Duration
lastUpdateTimestamp *time.Time
createObjFunc bundle.CreateObjectFunction
createBundleFunc bundle.CreateBundleFunction
intervalPolicy intervalpolicy.IntervalPolicy
}

func (syncer *genericDBToTransportSyncer) Start(stopChannel <-chan struct{}) error {
ctx, cancelContext := context.WithCancel(context.Background())
defer cancelContext()

syncer.init(ctx)
ticker := time.NewTicker(syncer.syncInterval)

for {
select {
case <-stopChannel:
ticker.Stop()
cancelContext()
syncer.log.Info("stopped syncer", "table", syncer.dbTableName)
go syncer.periodicSync(ctx)

return nil
case <-ticker.C:
go syncer.syncBundle(ctx)
}
}
<-stopChannel // blocking wait for stop event
cancelContext()
syncer.log.Info("stopped syncer", "table", syncer.dbTableName)

return nil
}

func (syncer *genericDBToTransportSyncer) init(ctx context.Context) {
Expand All @@ -56,13 +51,14 @@ func (syncer *genericDBToTransportSyncer) init(ctx context.Context) {
// later, in SyncBundle, it will check the db if there are newer updates and if yes it will send it with
// transport layer and update the lastUpdateTimestamp field accordingly.
timestamp := syncer.initLastUpdateTimestampFromTransport()

if timestamp != nil {
syncer.lastUpdateTimestamp = timestamp
} else {
syncer.lastUpdateTimestamp = &time.Time{}
}

syncer.log.Info("initialzed syncer", "table", fmt.Sprintf("spec.%s", syncer.dbTableName))
syncer.log.Info("initialized syncer", "table", fmt.Sprintf("spec.%s", syncer.dbTableName))
syncer.syncBundle(ctx)
}

Expand All @@ -80,29 +76,68 @@ func (syncer *genericDBToTransportSyncer) initLastUpdateTimestampFromTransport()
return &timestamp
}

func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) {
// syncBundle performs the actual sync logic and returns true if bundle was committed to transport, otherwise false.
func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) bool {
lastUpdateTimestamp, err := syncer.db.GetLastUpdateTimestamp(ctx, syncer.dbTableName)
if err != nil {
syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName)
return

return false
}

if !lastUpdateTimestamp.After(*syncer.lastUpdateTimestamp) { // sync only if something has changed
return
return false
}

// if we got here, then the last update timestamp from db is after what we have in memory.
// this means something has changed in db, syncing to transport.
bundleResult := syncer.createBundleFunc()
lastUpdateTimestamp, err = syncer.db.GetBundle(ctx, syncer.dbTableName, syncer.createObjFunc, bundleResult)

if err != nil {
syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName)
return

return false
}

syncer.lastUpdateTimestamp = lastUpdateTimestamp

syncer.syncToTransport(syncer.transportBundleKey, datatypes.SpecBundle, lastUpdateTimestamp, bundleResult)

return true
}

func (syncer *genericDBToTransportSyncer) periodicSync(ctx context.Context) {
ticker := time.NewTicker(syncer.intervalPolicy.GetInterval())

for {
select {
case <-ctx.Done(): // we have received a signal to stop
ticker.Stop()
return

case <-ticker.C:
synced := syncer.syncBundle(ctx)

// get current sync interval
currentInterval := syncer.intervalPolicy.GetInterval()

// notify policy whether sync was actually performed or skipped
if synced {
syncer.intervalPolicy.Evaluate()
} else {
syncer.intervalPolicy.Reset()
}

// get reevaluated sync interval
reevaluatedInterval := syncer.intervalPolicy.GetInterval()

// reset ticker if needed
if currentInterval != reevaluatedInterval {
ticker.Reset(reevaluatedInterval)
syncer.log.Info(fmt.Sprintf("sync interval has been reset to %s", reevaluatedInterval.String()))
}
}
}
}

func (syncer *genericDBToTransportSyncer) syncToTransport(id string, objType string, timestamp *time.Time,
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/dbsyncer/hoh_config_db_to_transport_syncer.go
Expand Up @@ -7,6 +7,7 @@ import (
configv1 "github.com/open-cluster-management/hub-of-hubs-data-types/apis/config/v1"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -26,9 +27,9 @@ func AddHoHConfigDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, tr
dbTableName: configTableName,
transport: transport,
transportBundleKey: configsMsgKey,
syncInterval: syncInterval,
createObjFunc: func() metav1.Object { return &configv1.Config{} },
createBundleFunc: bundle.NewBaseBundle,
intervalPolicy: intervalpolicy.NewExponentialBackoffIntervalPolicy(syncInterval),
}); err != nil {
return fmt.Errorf("failed to add db to transport syncer - %w", err)
}
Expand Down
Expand Up @@ -7,6 +7,7 @@ import (
policiesv1 "github.com/open-cluster-management/governance-policy-propagator/pkg/apis/policy/v1"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -21,14 +22,14 @@ const (
func AddPlacementBindingsDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, transport transport.Transport,
syncInterval time.Duration) error {
if err := mgr.Add(&genericDBToTransportSyncer{
log: ctrl.Log.WithName("policy-db-to-transport-syncer"),
log: ctrl.Log.WithName("placement-bindings-db-to-transport-syncer"),
db: db,
dbTableName: placementBindingsTableName,
transport: transport,
transportBundleKey: placementBindingsMsgKey,
syncInterval: syncInterval,
createObjFunc: func() metav1.Object { return &policiesv1.PlacementBinding{} },
createBundleFunc: bundle.NewPlacementBindingBundle,
intervalPolicy: intervalpolicy.NewExponentialBackoffIntervalPolicy(syncInterval),
}); err != nil {
return fmt.Errorf("failed to add db to transport syncer - %w", err)
}
Expand Down
Expand Up @@ -7,6 +7,7 @@ import (
appsv1 "github.com/open-cluster-management/governance-policy-propagator/pkg/apis/apps/v1"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -26,9 +27,9 @@ func AddPlacementRulesDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecD
dbTableName: placementRulesTableName,
transport: transport,
transportBundleKey: placementRulesMsgKey,
syncInterval: syncInterval,
createObjFunc: func() metav1.Object { return &appsv1.PlacementRule{} },
createBundleFunc: bundle.NewBaseBundle,
intervalPolicy: intervalpolicy.NewExponentialBackoffIntervalPolicy(syncInterval),
}); err != nil {
return fmt.Errorf("failed to add db to transport syncer - %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/dbsyncer/policies_db_to_transport_syncer.go
Expand Up @@ -7,6 +7,7 @@ import (
policiesv1 "github.com/open-cluster-management/governance-policy-propagator/pkg/apis/policy/v1"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -26,9 +27,9 @@ func AddPoliciesDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, tra
dbTableName: policiesTableName,
transport: transport,
transportBundleKey: policiesMsgKey,
syncInterval: syncInterval,
createObjFunc: func() metav1.Object { return &policiesv1.Policy{} },
createBundleFunc: bundle.NewBaseBundle,
intervalPolicy: intervalpolicy.NewExponentialBackoffIntervalPolicy(syncInterval),
}); err != nil {
return fmt.Errorf("failed to add db to transport syncer - %w", err)
}
Expand Down
63 changes: 63 additions & 0 deletions pkg/intervalpolicy/exponential_backoff_interval_policy.go
@@ -0,0 +1,63 @@
package intervalpolicy

import (
"time"

"github.com/cenkalti/backoff/v4"
)

const (
baseFactor = 2
maxInterval = 60 * time.Second
maxNumOfConsecutiveEvaluationsBeforeNextBackoff = 3
)

// exponentialBackoffIntervalPolicy is a default interval policy.
type exponentialBackoffIntervalPolicy struct {
exponentialBackoff *backoff.ExponentialBackOff
interval time.Duration
numOfConsecutiveEvaluations int
}

// NewExponentialBackoffIntervalPolicy creates new exponential backoff interval policy.
func NewExponentialBackoffIntervalPolicy(interval time.Duration) IntervalPolicy {
exponentialBackoff := &backoff.ExponentialBackOff{
InitialInterval: interval,
RandomizationFactor: 0,
Multiplier: baseFactor,
MaxInterval: maxInterval,
MaxElapsedTime: 0,
Stop: 0,
Clock: backoff.SystemClock,
}

exponentialBackoff.Reset()

return &exponentialBackoffIntervalPolicy{
exponentialBackoff: exponentialBackoff,
interval: interval,
numOfConsecutiveEvaluations: 0,
}
}

// Evaluate reevaluates interval.
func (policy *exponentialBackoffIntervalPolicy) Evaluate() {
policy.numOfConsecutiveEvaluations++

if policy.numOfConsecutiveEvaluations == maxNumOfConsecutiveEvaluationsBeforeNextBackoff {
policy.interval = policy.exponentialBackoff.NextBackOff()
policy.numOfConsecutiveEvaluations = 0
}
}

// Reset resets the entire state of the policy.
func (policy *exponentialBackoffIntervalPolicy) Reset() {
policy.exponentialBackoff.Reset()
policy.interval = policy.exponentialBackoff.InitialInterval
policy.numOfConsecutiveEvaluations = 0
}

// GetInterval returns reevaluated interval.
func (policy *exponentialBackoffIntervalPolicy) GetInterval() time.Duration {
return policy.interval
}
13 changes: 13 additions & 0 deletions pkg/intervalpolicy/interval_policy.go
@@ -0,0 +1,13 @@
package intervalpolicy

import "time"

// IntervalPolicy defines a policy to return interval based on the received events.
type IntervalPolicy interface {
// Evaluate evaluates next interval.
Evaluate()
// Reset resets the interval of the interval policy.
Reset()
// GetInterval returns current interval.
GetInterval() time.Duration
}

0 comments on commit f7c4315

Please sign in to comment.