From 73d27cf01fb4116631d9a3485bfcf4af1552312a Mon Sep 17 00:00:00 2001 From: olegsternberg Date: Wed, 3 Nov 2021 10:45:37 +0200 Subject: [PATCH 01/15] * first commit --- go.mod | 3 +- go.sum | 6 ++ .../generic_db_to_transport_syncer.go | 93 ++++++++++++------- .../hoh_config_db_to_transport_syncer.go | 2 +- ...lacementbindings_db_to_transport_syncer.go | 2 +- .../placementrules_db_to_transport_syncer.go | 2 +- .../policies_db_to_transport_syncer.go | 2 +- .../dbsyncer/syncer_interval_policy.go | 58 ++++++++++++ 8 files changed, 128 insertions(+), 40 deletions(-) create mode 100644 pkg/controller/dbsyncer/syncer_interval_policy.go diff --git a/go.mod b/go.mod index 3cf793f..25f6565 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,8 @@ require ( github.com/go-logr/zapr v0.2.0 // indirect github.com/jackc/pgx/v4 v4.11.0 github.com/open-cluster-management/governance-policy-propagator v0.0.0-20210520203318-a78632de1e26 - github.com/open-cluster-management/hub-of-hubs-data-types v0.1.0 + github.com/open-cluster-management/hub-of-hubs-data-types v0.1.1-0.20210913120916-7dc589acefb4 + github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.1-0.20210913120916-7dc589acefb4 github.com/open-horizon/edge-sync-service-client v0.0.0-20190711093406-dc3a19905da2 github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152 // indirect github.com/operator-framework/operator-sdk v0.19.4 diff --git a/go.sum b/go.sum index 9722f70..7433817 100644 --- a/go.sum +++ b/go.sum @@ -771,6 +771,12 @@ github.com/open-cluster-management/governance-policy-propagator v0.0.0-202105202 github.com/open-cluster-management/governance-policy-propagator v0.0.0-20210520203318-a78632de1e26/go.mod h1:7gzDyC7PddcuInXS+kLrfYmto/ImSORibXwy8RGjNM8= github.com/open-cluster-management/hub-of-hubs-data-types v0.1.0 h1:w7DbcYX27W6qCnumAj0rQbvsLVwFY0ggy6ZXpbqPsus= github.com/open-cluster-management/hub-of-hubs-data-types v0.1.0/go.mod h1:OOI0mxL5BOSq0TN6dVBfHiA6PUeapn+sMBXm9fLNaw8= +github.com/open-cluster-management/hub-of-hubs-data-types v0.1.1-0.20210913120916-7dc589acefb4 h1:UueRf7YhBhHhWgop66CQxosFSQ/cUgNuNSOg8UPfOA0= +github.com/open-cluster-management/hub-of-hubs-data-types v0.1.1-0.20210913120916-7dc589acefb4/go.mod h1:Th2t5lukBfvPb5tvNCMROedinKR4s64sK5hgMt8Er7w= +github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.0 h1:qgTStCi+y6/nkFRF+cXHGLK50mGxA7YXB2Ey8BTsmyc= +github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.0/go.mod h1:xpGxw+0si9vuDABvcJ9FxjznPw/uAbICzhaX+9JLLho= +github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.1-0.20210913120916-7dc589acefb4 h1:BUTr6y9DEgHVT6pVDXfKYfw3pTTJ9aZP/MVbMDUXOOA= +github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.1-0.20210913120916-7dc589acefb4/go.mod h1:xpGxw+0si9vuDABvcJ9FxjznPw/uAbICzhaX+9JLLho= github.com/open-horizon/edge-sync-service-client v0.0.0-20190711093406-dc3a19905da2 h1:ZmX/tuCgFVbe07pqObNWf14cAYpHN8gVRSkWnbtSUW4= github.com/open-horizon/edge-sync-service-client v0.0.0-20190711093406-dc3a19905da2/go.mod h1:rKW01VtqwtwaUQ+zkJ80X5EeGqsTblQE4J9HxPUtTAQ= github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152 h1:YEvNOMo3ANOQ3AwsU0cCcBA4nKHDLUlyUCRWk5rBf68= diff --git a/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go b/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go index 81667ee..7011847 100644 --- a/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go @@ -23,47 +23,41 @@ type genericDBToTransportSyncer struct { dbTableName string transport transport.Transport transportBundleKey string - syncInterval time.Duration lastUpdateTimestamp *time.Time createObjFunc bundle.CreateObjectFunction createBundleFunc bundle.CreateBundleFunction + intervalPolicy syncerIntervalPolicy } func (syncer *genericDBToTransportSyncer) Start(stopChannel <-chan struct{}) error { ctx, cancelContext := context.WithCancel(context.Background()) defer cancelContext() - syncer.init(ctx) - ticker := time.NewTicker(syncer.syncInterval) + syncer.init() - for { - select { - case <-stopChannel: - ticker.Stop() - cancelContext() - syncer.log.Info("stopped syncer", "table", syncer.dbTableName) + go syncer.syncBundle(ctx) - return nil - case <-ticker.C: - go syncer.syncBundle(ctx) - } - } + <-stopChannel // blocking wait for stop event + cancelContext() + syncer.log.Info("stopped syncer", "table", syncer.dbTableName) + + return nil } -func (syncer *genericDBToTransportSyncer) init(ctx context.Context) { +func (syncer *genericDBToTransportSyncer) init() { // on initialization, we initialize the lastUpdateTimestamp from the transport layer, as this is the last timestamp // that transport bridge sent an update. // later, in SyncBundle, it will check the db if there are newer updates and if yes it will send it with // transport layer and update the lastUpdateTimestamp field accordingly. timestamp := syncer.initLastUpdateTimestampFromTransport() + if timestamp != nil { syncer.lastUpdateTimestamp = timestamp } else { syncer.lastUpdateTimestamp = &time.Time{} } - syncer.log.Info("initialzed syncer", "table", fmt.Sprintf("spec.%s", syncer.dbTableName)) - syncer.syncBundle(ctx) + syncer.log.Info("initialized syncer", "table", fmt.Sprintf("spec.%s", syncer.dbTableName)) } func (syncer *genericDBToTransportSyncer) initLastUpdateTimestampFromTransport() *time.Time { @@ -81,28 +75,57 @@ func (syncer *genericDBToTransportSyncer) initLastUpdateTimestampFromTransport() } func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { - lastUpdateTimestamp, err := syncer.db.GetLastUpdateTimestamp(ctx, syncer.dbTableName) - if err != nil { - syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) - return - } + currentSyncInterval := syncer.intervalPolicy.getInterval() + ticker := time.NewTicker(currentSyncInterval) - if !lastUpdateTimestamp.After(*syncer.lastUpdateTimestamp) { // sync only if something has changed - return - } - // if we got here, then the last update timestamp from db is after what we have in memory. - // this means something has changed in db, syncing to transport. - bundleResult := syncer.createBundleFunc() - lastUpdateTimestamp, err = syncer.db.GetBundle(ctx, syncer.dbTableName, syncer.createObjFunc, bundleResult) + for { + select { + case <-ctx.Done(): // we have received a signal to stop + ticker.Stop() + return - if err != nil { - syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) - return - } + case <-ticker.C: + lastUpdateTimestamp, err := syncer.db.GetLastUpdateTimestamp(ctx, syncer.dbTableName) + if err != nil { + syncer.intervalPolicy.onSyncSkipped() + syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) + + return + } + + if !lastUpdateTimestamp.After(*syncer.lastUpdateTimestamp) { // sync only if something has changed + syncer.intervalPolicy.onSyncSkipped() + + return + } - syncer.lastUpdateTimestamp = lastUpdateTimestamp + // if we got here, then the last update timestamp from db is after what we have in memory. + // this means something has changed in db, syncing to transport. + bundleResult := syncer.createBundleFunc() + lastUpdateTimestamp, err = syncer.db.GetBundle(ctx, syncer.dbTableName, syncer.createObjFunc, bundleResult) - syncer.syncToTransport(syncer.transportBundleKey, datatypes.SpecBundle, lastUpdateTimestamp, bundleResult) + if err != nil { + syncer.intervalPolicy.onSyncSkipped() + syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) + + return + } + + syncer.lastUpdateTimestamp = lastUpdateTimestamp + + syncer.syncToTransport(syncer.transportBundleKey, datatypes.SpecBundle, lastUpdateTimestamp, bundleResult) + syncer.intervalPolicy.onSyncPerformed() + + interval := syncer.intervalPolicy.getInterval() + + // reset ticker if sync interval has changed + if interval != currentSyncInterval { + currentSyncInterval = interval + ticker.Reset(currentSyncInterval) + syncer.log.Info(fmt.Sprintf("periodic sync interval has been reset to %s", currentSyncInterval.String())) + } + } + } } func (syncer *genericDBToTransportSyncer) syncToTransport(id string, objType string, timestamp *time.Time, diff --git a/pkg/controller/dbsyncer/hoh_config_db_to_transport_syncer.go b/pkg/controller/dbsyncer/hoh_config_db_to_transport_syncer.go index 0a22277..1e845aa 100644 --- a/pkg/controller/dbsyncer/hoh_config_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/hoh_config_db_to_transport_syncer.go @@ -26,9 +26,9 @@ func AddHoHConfigDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, tr dbTableName: configTableName, transport: transport, transportBundleKey: configsMsgKey, - syncInterval: syncInterval, createObjFunc: func() metav1.Object { return &configv1.Config{} }, createBundleFunc: bundle.NewBaseBundle, + intervalPolicy: newDefaultSyncerIntervalPolicy(syncInterval), }); err != nil { return fmt.Errorf("failed to add db to transport syncer - %w", err) } diff --git a/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go b/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go index 5fbc2fa..850967b 100644 --- a/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go @@ -26,9 +26,9 @@ func AddPlacementBindingsDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSp dbTableName: placementBindingsTableName, transport: transport, transportBundleKey: placementBindingsMsgKey, - syncInterval: syncInterval, createObjFunc: func() metav1.Object { return &policiesv1.PlacementBinding{} }, createBundleFunc: bundle.NewPlacementBindingBundle, + intervalPolicy: newDefaultSyncerIntervalPolicy(syncInterval), }); err != nil { return fmt.Errorf("failed to add db to transport syncer - %w", err) } diff --git a/pkg/controller/dbsyncer/placementrules_db_to_transport_syncer.go b/pkg/controller/dbsyncer/placementrules_db_to_transport_syncer.go index 21f6430..29a6f77 100644 --- a/pkg/controller/dbsyncer/placementrules_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/placementrules_db_to_transport_syncer.go @@ -26,9 +26,9 @@ func AddPlacementRulesDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecD dbTableName: placementRulesTableName, transport: transport, transportBundleKey: placementRulesMsgKey, - syncInterval: syncInterval, createObjFunc: func() metav1.Object { return &appsv1.PlacementRule{} }, createBundleFunc: bundle.NewBaseBundle, + intervalPolicy: newDefaultSyncerIntervalPolicy(syncInterval), }); err != nil { return fmt.Errorf("failed to add db to transport syncer - %w", err) } diff --git a/pkg/controller/dbsyncer/policies_db_to_transport_syncer.go b/pkg/controller/dbsyncer/policies_db_to_transport_syncer.go index 24aa333..285c9eb 100644 --- a/pkg/controller/dbsyncer/policies_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/policies_db_to_transport_syncer.go @@ -26,9 +26,9 @@ func AddPoliciesDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, tra dbTableName: policiesTableName, transport: transport, transportBundleKey: policiesMsgKey, - syncInterval: syncInterval, createObjFunc: func() metav1.Object { return &policiesv1.Policy{} }, createBundleFunc: bundle.NewBaseBundle, + intervalPolicy: newDefaultSyncerIntervalPolicy(syncInterval), }); err != nil { return fmt.Errorf("failed to add db to transport syncer - %w", err) } diff --git a/pkg/controller/dbsyncer/syncer_interval_policy.go b/pkg/controller/dbsyncer/syncer_interval_policy.go new file mode 100644 index 0000000..f63119b --- /dev/null +++ b/pkg/controller/dbsyncer/syncer_interval_policy.go @@ -0,0 +1,58 @@ +package dbsyncer + +import ( + "time" +) + +const ( + maxNumOfConsecutiveSyncs = 3 + maxNumOfIncrements = 3 + floatingFactor = 2 +) + +// syncerIntervalPolicy defines a policy to return syncer "floating" interval. +type syncerIntervalPolicy interface { + onSyncPerformed() + onSyncSkipped() + getInterval() time.Duration +} + +// defaultSyncerIntervalPolicy is a default syncer "floating" interval policy. +type defaultSyncerIntervalPolicy struct { + originalInterval time.Duration + floatingInterval time.Duration + numOfConsecutiveSyncs int64 + numOfIncrements int64 +} + +func newDefaultSyncerIntervalPolicy(interval time.Duration) *defaultSyncerIntervalPolicy { + return &defaultSyncerIntervalPolicy{originalInterval: interval, floatingInterval: interval} +} + +// onSyncPerformed recalculates floating interval if sync happened. +func (policy *defaultSyncerIntervalPolicy) onSyncPerformed() { + // we've reached maximum number of interval's increments due to consecutive syncs - there is nothing to do + if policy.numOfIncrements == maxNumOfIncrements { + return + } + + policy.numOfConsecutiveSyncs++ + + // we've reached maximum number of consecutive sync - increment the interval + if policy.numOfConsecutiveSyncs == maxNumOfConsecutiveSyncs { + policy.floatingInterval *= floatingFactor + policy.numOfConsecutiveSyncs = 0 + policy.numOfIncrements++ + } +} + +// onSyncSkipped resets the entire state of the policy if sync was skipped. +func (policy *defaultSyncerIntervalPolicy) onSyncSkipped() { + policy.numOfConsecutiveSyncs = 0 + policy.numOfIncrements = 0 + policy.floatingInterval = policy.originalInterval +} + +func (policy *defaultSyncerIntervalPolicy) getInterval() time.Duration { + return policy.floatingInterval +} From 9c4f81bc25d73aea65294b26173d852f04756c3c Mon Sep 17 00:00:00 2001 From: olegsternberg Date: Wed, 3 Nov 2021 11:19:25 +0200 Subject: [PATCH 02/15] * 'continue' fix --- pkg/controller/dbsyncer/generic_db_to_transport_syncer.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go b/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go index 7011847..3ec5e78 100644 --- a/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go @@ -90,13 +90,13 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { syncer.intervalPolicy.onSyncSkipped() syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) - return + continue } if !lastUpdateTimestamp.After(*syncer.lastUpdateTimestamp) { // sync only if something has changed syncer.intervalPolicy.onSyncSkipped() - return + continue } // if we got here, then the last update timestamp from db is after what we have in memory. @@ -108,7 +108,7 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { syncer.intervalPolicy.onSyncSkipped() syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) - return + continue } syncer.lastUpdateTimestamp = lastUpdateTimestamp @@ -122,7 +122,7 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { if interval != currentSyncInterval { currentSyncInterval = interval ticker.Reset(currentSyncInterval) - syncer.log.Info(fmt.Sprintf("periodic sync interval has been reset to %s", currentSyncInterval.String())) + syncer.log.Info(fmt.Sprintf("sync interval has been reset to %s", currentSyncInterval.String())) } } } From 7fa6550ceb2795851ebea65b7ef7fa6f5abbfc8b Mon Sep 17 00:00:00 2001 From: olegsternberg Date: Thu, 4 Nov 2021 09:45:52 +0200 Subject: [PATCH 03/15] * fixed ticker's interval adjustment --- .../generic_db_to_transport_syncer.go | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go b/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go index 3ec5e78..0543ee2 100644 --- a/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go @@ -87,14 +87,14 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { case <-ticker.C: lastUpdateTimestamp, err := syncer.db.GetLastUpdateTimestamp(ctx, syncer.dbTableName) if err != nil { - syncer.intervalPolicy.onSyncSkipped() + syncer.adjustTicker(ticker, ¤tSyncInterval, false) syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) continue } if !lastUpdateTimestamp.After(*syncer.lastUpdateTimestamp) { // sync only if something has changed - syncer.intervalPolicy.onSyncSkipped() + syncer.adjustTicker(ticker, ¤tSyncInterval, false) continue } @@ -105,7 +105,7 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { lastUpdateTimestamp, err = syncer.db.GetBundle(ctx, syncer.dbTableName, syncer.createObjFunc, bundleResult) if err != nil { - syncer.intervalPolicy.onSyncSkipped() + syncer.adjustTicker(ticker, ¤tSyncInterval, false) syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) continue @@ -114,17 +114,28 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { syncer.lastUpdateTimestamp = lastUpdateTimestamp syncer.syncToTransport(syncer.transportBundleKey, datatypes.SpecBundle, lastUpdateTimestamp, bundleResult) - syncer.intervalPolicy.onSyncPerformed() + syncer.adjustTicker(ticker, ¤tSyncInterval, true) + } + } +} + +func (syncer *genericDBToTransportSyncer) adjustTicker(ticker *time.Ticker, currentSyncInterval *time.Duration, + syncPerformed bool) { + // notify policy whether sync was actually performed or skipped + if syncPerformed { + syncer.intervalPolicy.onSyncPerformed() + } else { + syncer.intervalPolicy.onSyncSkipped() + } - interval := syncer.intervalPolicy.getInterval() + // get recalculated sync interval + interval := syncer.intervalPolicy.getInterval() - // reset ticker if sync interval has changed - if interval != currentSyncInterval { - currentSyncInterval = interval - ticker.Reset(currentSyncInterval) - syncer.log.Info(fmt.Sprintf("sync interval has been reset to %s", currentSyncInterval.String())) - } - } + // reset ticker if sync interval has changed + if interval != *currentSyncInterval { + *currentSyncInterval = interval + ticker.Reset(*currentSyncInterval) + syncer.log.Info(fmt.Sprintf("sync interval has been reset to %s", currentSyncInterval.String())) } } From f50daed821dea51a2d8100fc0b0635a4972a56e7 Mon Sep 17 00:00:00 2001 From: olegsternberg Date: Thu, 4 Nov 2021 13:53:19 +0200 Subject: [PATCH 04/15] * rename method --- .../dbsyncer/generic_db_to_transport_syncer.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go b/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go index 0543ee2..5738592 100644 --- a/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go @@ -87,14 +87,14 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { case <-ticker.C: lastUpdateTimestamp, err := syncer.db.GetLastUpdateTimestamp(ctx, syncer.dbTableName) if err != nil { - syncer.adjustTicker(ticker, ¤tSyncInterval, false) + syncer.adjustInterval(ticker, ¤tSyncInterval, false) syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) continue } if !lastUpdateTimestamp.After(*syncer.lastUpdateTimestamp) { // sync only if something has changed - syncer.adjustTicker(ticker, ¤tSyncInterval, false) + syncer.adjustInterval(ticker, ¤tSyncInterval, false) continue } @@ -105,7 +105,7 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { lastUpdateTimestamp, err = syncer.db.GetBundle(ctx, syncer.dbTableName, syncer.createObjFunc, bundleResult) if err != nil { - syncer.adjustTicker(ticker, ¤tSyncInterval, false) + syncer.adjustInterval(ticker, ¤tSyncInterval, false) syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) continue @@ -114,12 +114,12 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { syncer.lastUpdateTimestamp = lastUpdateTimestamp syncer.syncToTransport(syncer.transportBundleKey, datatypes.SpecBundle, lastUpdateTimestamp, bundleResult) - syncer.adjustTicker(ticker, ¤tSyncInterval, true) + syncer.adjustInterval(ticker, ¤tSyncInterval, true) } } } -func (syncer *genericDBToTransportSyncer) adjustTicker(ticker *time.Ticker, currentSyncInterval *time.Duration, +func (syncer *genericDBToTransportSyncer) adjustInterval(ticker *time.Ticker, currentSyncInterval *time.Duration, syncPerformed bool) { // notify policy whether sync was actually performed or skipped if syncPerformed { From 6fe9d4fa04ae7450e00a3b9813c41fd115736281 Mon Sep 17 00:00:00 2001 From: olegsternberg Date: Fri, 5 Nov 2021 09:34:28 +0200 Subject: [PATCH 05/15] * rename method --- pkg/controller/dbsyncer/syncer_interval_policy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/dbsyncer/syncer_interval_policy.go b/pkg/controller/dbsyncer/syncer_interval_policy.go index f63119b..7bf06e6 100644 --- a/pkg/controller/dbsyncer/syncer_interval_policy.go +++ b/pkg/controller/dbsyncer/syncer_interval_policy.go @@ -38,7 +38,7 @@ func (policy *defaultSyncerIntervalPolicy) onSyncPerformed() { policy.numOfConsecutiveSyncs++ - // we've reached maximum number of consecutive sync - increment the interval + // we've reached maximum number of consecutive syncs - increment the interval if policy.numOfConsecutiveSyncs == maxNumOfConsecutiveSyncs { policy.floatingInterval *= floatingFactor policy.numOfConsecutiveSyncs = 0 From 5e9e544b91809716c9ec7a3bfe1754f95db50432 Mon Sep 17 00:00:00 2001 From: olegsternberg Date: Sun, 7 Nov 2021 10:38:18 +0200 Subject: [PATCH 06/15] * PR review fixes --- .../generic_db_to_transport_syncer.go | 37 ++++++++++--------- .../hoh_config_db_to_transport_syncer.go | 3 +- .../default_syncer_interval_policy.go} | 23 +++++------- .../intervalpolicy/syncer_interval_policy.go | 10 +++++ ...lacementbindings_db_to_transport_syncer.go | 3 +- .../placementrules_db_to_transport_syncer.go | 3 +- .../policies_db_to_transport_syncer.go | 3 +- 7 files changed, 47 insertions(+), 35 deletions(-) rename pkg/controller/dbsyncer/{syncer_interval_policy.go => intervalpolicy/default_syncer_interval_policy.go} (64%) create mode 100644 pkg/controller/dbsyncer/intervalpolicy/syncer_interval_policy.go diff --git a/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go b/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go index 5738592..623afef 100644 --- a/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go @@ -9,6 +9,7 @@ import ( "github.com/go-logr/logr" datatypes "github.com/open-cluster-management/hub-of-hubs-data-types" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle" + "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/controller/dbsyncer/intervalpolicy" hohDb "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport" ) @@ -26,7 +27,7 @@ type genericDBToTransportSyncer struct { lastUpdateTimestamp *time.Time createObjFunc bundle.CreateObjectFunction createBundleFunc bundle.CreateBundleFunction - intervalPolicy syncerIntervalPolicy + intervalPolicy intervalpolicy.SyncerIntervalPolicy } func (syncer *genericDBToTransportSyncer) Start(stopChannel <-chan struct{}) error { @@ -75,8 +76,7 @@ func (syncer *genericDBToTransportSyncer) initLastUpdateTimestampFromTransport() } func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { - currentSyncInterval := syncer.intervalPolicy.getInterval() - ticker := time.NewTicker(currentSyncInterval) + ticker := time.NewTicker(syncer.intervalPolicy.GetInterval()) for { select { @@ -87,14 +87,14 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { case <-ticker.C: lastUpdateTimestamp, err := syncer.db.GetLastUpdateTimestamp(ctx, syncer.dbTableName) if err != nil { - syncer.adjustInterval(ticker, ¤tSyncInterval, false) + syncer.completeSyncIteration(ticker, false) syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) continue } if !lastUpdateTimestamp.After(*syncer.lastUpdateTimestamp) { // sync only if something has changed - syncer.adjustInterval(ticker, ¤tSyncInterval, false) + syncer.completeSyncIteration(ticker, false) continue } @@ -105,7 +105,7 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { lastUpdateTimestamp, err = syncer.db.GetBundle(ctx, syncer.dbTableName, syncer.createObjFunc, bundleResult) if err != nil { - syncer.adjustInterval(ticker, ¤tSyncInterval, false) + syncer.completeSyncIteration(ticker, false) syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) continue @@ -114,28 +114,31 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { syncer.lastUpdateTimestamp = lastUpdateTimestamp syncer.syncToTransport(syncer.transportBundleKey, datatypes.SpecBundle, lastUpdateTimestamp, bundleResult) - syncer.adjustInterval(ticker, ¤tSyncInterval, true) + syncer.completeSyncIteration(ticker, true) } } } -func (syncer *genericDBToTransportSyncer) adjustInterval(ticker *time.Ticker, currentSyncInterval *time.Duration, - syncPerformed bool) { +// completeSyncIteration notifies policy whether sync was actually performed or skipped and resets ticker's interval +// to a new recalculated one. +func (syncer *genericDBToTransportSyncer) completeSyncIteration(ticker *time.Ticker, syncPerformed bool) { + // get current sync interval + currentInterval := syncer.intervalPolicy.GetInterval() + // notify policy whether sync was actually performed or skipped if syncPerformed { - syncer.intervalPolicy.onSyncPerformed() + syncer.intervalPolicy.OnSyncPerformed() } else { - syncer.intervalPolicy.onSyncSkipped() + syncer.intervalPolicy.OnSyncSkipped() } // get recalculated sync interval - interval := syncer.intervalPolicy.getInterval() + recalculatedInterval := syncer.intervalPolicy.GetInterval() + + ticker.Reset(recalculatedInterval) - // reset ticker if sync interval has changed - if interval != *currentSyncInterval { - *currentSyncInterval = interval - ticker.Reset(*currentSyncInterval) - syncer.log.Info(fmt.Sprintf("sync interval has been reset to %s", currentSyncInterval.String())) + if currentInterval != recalculatedInterval { + syncer.log.Info(fmt.Sprintf("sync interval has been reset to %s", recalculatedInterval.String())) } } diff --git a/pkg/controller/dbsyncer/hoh_config_db_to_transport_syncer.go b/pkg/controller/dbsyncer/hoh_config_db_to_transport_syncer.go index 1e845aa..37d4a0f 100644 --- a/pkg/controller/dbsyncer/hoh_config_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/hoh_config_db_to_transport_syncer.go @@ -6,6 +6,7 @@ import ( configv1 "github.com/open-cluster-management/hub-of-hubs-data-types/apis/config/v1" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle" + "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/controller/dbsyncer/intervalpolicy" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -28,7 +29,7 @@ func AddHoHConfigDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, tr transportBundleKey: configsMsgKey, createObjFunc: func() metav1.Object { return &configv1.Config{} }, createBundleFunc: bundle.NewBaseBundle, - intervalPolicy: newDefaultSyncerIntervalPolicy(syncInterval), + intervalPolicy: intervalpolicy.NewDefaultSyncerIntervalPolicy(syncInterval), }); err != nil { return fmt.Errorf("failed to add db to transport syncer - %w", err) } diff --git a/pkg/controller/dbsyncer/syncer_interval_policy.go b/pkg/controller/dbsyncer/intervalpolicy/default_syncer_interval_policy.go similarity index 64% rename from pkg/controller/dbsyncer/syncer_interval_policy.go rename to pkg/controller/dbsyncer/intervalpolicy/default_syncer_interval_policy.go index 7bf06e6..b766b99 100644 --- a/pkg/controller/dbsyncer/syncer_interval_policy.go +++ b/pkg/controller/dbsyncer/intervalpolicy/default_syncer_interval_policy.go @@ -1,4 +1,4 @@ -package dbsyncer +package intervalpolicy import ( "time" @@ -10,13 +10,6 @@ const ( floatingFactor = 2 ) -// syncerIntervalPolicy defines a policy to return syncer "floating" interval. -type syncerIntervalPolicy interface { - onSyncPerformed() - onSyncSkipped() - getInterval() time.Duration -} - // defaultSyncerIntervalPolicy is a default syncer "floating" interval policy. type defaultSyncerIntervalPolicy struct { originalInterval time.Duration @@ -25,12 +18,13 @@ type defaultSyncerIntervalPolicy struct { numOfIncrements int64 } -func newDefaultSyncerIntervalPolicy(interval time.Duration) *defaultSyncerIntervalPolicy { +// NewDefaultSyncerIntervalPolicy creates new default syncer interval policy. +func NewDefaultSyncerIntervalPolicy(interval time.Duration) SyncerIntervalPolicy { return &defaultSyncerIntervalPolicy{originalInterval: interval, floatingInterval: interval} } -// onSyncPerformed recalculates floating interval if sync happened. -func (policy *defaultSyncerIntervalPolicy) onSyncPerformed() { +// OnSyncPerformed recalculates floating interval if sync happened. +func (policy *defaultSyncerIntervalPolicy) OnSyncPerformed() { // we've reached maximum number of interval's increments due to consecutive syncs - there is nothing to do if policy.numOfIncrements == maxNumOfIncrements { return @@ -46,13 +40,14 @@ func (policy *defaultSyncerIntervalPolicy) onSyncPerformed() { } } -// onSyncSkipped resets the entire state of the policy if sync was skipped. -func (policy *defaultSyncerIntervalPolicy) onSyncSkipped() { +// OnSyncSkipped resets the entire state of the policy if sync was skipped. +func (policy *defaultSyncerIntervalPolicy) OnSyncSkipped() { policy.numOfConsecutiveSyncs = 0 policy.numOfIncrements = 0 policy.floatingInterval = policy.originalInterval } -func (policy *defaultSyncerIntervalPolicy) getInterval() time.Duration { +// GetInterval returns recalculated interval bases on the received events. +func (policy *defaultSyncerIntervalPolicy) GetInterval() time.Duration { return policy.floatingInterval } diff --git a/pkg/controller/dbsyncer/intervalpolicy/syncer_interval_policy.go b/pkg/controller/dbsyncer/intervalpolicy/syncer_interval_policy.go new file mode 100644 index 0000000..d26ca8e --- /dev/null +++ b/pkg/controller/dbsyncer/intervalpolicy/syncer_interval_policy.go @@ -0,0 +1,10 @@ +package intervalpolicy + +import "time" + +// SyncerIntervalPolicy defines a policy to return syncer interval based on the received events. +type SyncerIntervalPolicy interface { + OnSyncPerformed() + OnSyncSkipped() + GetInterval() time.Duration +} diff --git a/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go b/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go index 850967b..76bf724 100644 --- a/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go @@ -6,6 +6,7 @@ import ( policiesv1 "github.com/open-cluster-management/governance-policy-propagator/pkg/apis/policy/v1" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle" + "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/controller/dbsyncer/intervalpolicy" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -28,7 +29,7 @@ func AddPlacementBindingsDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSp transportBundleKey: placementBindingsMsgKey, createObjFunc: func() metav1.Object { return &policiesv1.PlacementBinding{} }, createBundleFunc: bundle.NewPlacementBindingBundle, - intervalPolicy: newDefaultSyncerIntervalPolicy(syncInterval), + intervalPolicy: intervalpolicy.NewDefaultSyncerIntervalPolicy(syncInterval), }); err != nil { return fmt.Errorf("failed to add db to transport syncer - %w", err) } diff --git a/pkg/controller/dbsyncer/placementrules_db_to_transport_syncer.go b/pkg/controller/dbsyncer/placementrules_db_to_transport_syncer.go index 29a6f77..8291238 100644 --- a/pkg/controller/dbsyncer/placementrules_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/placementrules_db_to_transport_syncer.go @@ -6,6 +6,7 @@ import ( appsv1 "github.com/open-cluster-management/governance-policy-propagator/pkg/apis/apps/v1" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle" + "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/controller/dbsyncer/intervalpolicy" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -28,7 +29,7 @@ func AddPlacementRulesDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecD transportBundleKey: placementRulesMsgKey, createObjFunc: func() metav1.Object { return &appsv1.PlacementRule{} }, createBundleFunc: bundle.NewBaseBundle, - intervalPolicy: newDefaultSyncerIntervalPolicy(syncInterval), + intervalPolicy: intervalpolicy.NewDefaultSyncerIntervalPolicy(syncInterval), }); err != nil { return fmt.Errorf("failed to add db to transport syncer - %w", err) } diff --git a/pkg/controller/dbsyncer/policies_db_to_transport_syncer.go b/pkg/controller/dbsyncer/policies_db_to_transport_syncer.go index 285c9eb..3b77e84 100644 --- a/pkg/controller/dbsyncer/policies_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/policies_db_to_transport_syncer.go @@ -6,6 +6,7 @@ import ( policiesv1 "github.com/open-cluster-management/governance-policy-propagator/pkg/apis/policy/v1" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle" + "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/controller/dbsyncer/intervalpolicy" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -28,7 +29,7 @@ func AddPoliciesDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, tra transportBundleKey: policiesMsgKey, createObjFunc: func() metav1.Object { return &policiesv1.Policy{} }, createBundleFunc: bundle.NewBaseBundle, - intervalPolicy: newDefaultSyncerIntervalPolicy(syncInterval), + intervalPolicy: intervalpolicy.NewDefaultSyncerIntervalPolicy(syncInterval), }); err != nil { return fmt.Errorf("failed to add db to transport syncer - %w", err) } From 7412c86ac8e493fbd0c77985c675ba4cd5aa69a7 Mon Sep 17 00:00:00 2001 From: olegsternberg Date: Sun, 7 Nov 2021 16:14:03 +0200 Subject: [PATCH 07/15] * PR review fixes --- .../dbsyncer/generic_db_to_transport_syncer.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go b/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go index 623afef..0683bf4 100644 --- a/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go @@ -87,14 +87,14 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { case <-ticker.C: lastUpdateTimestamp, err := syncer.db.GetLastUpdateTimestamp(ctx, syncer.dbTableName) if err != nil { - syncer.completeSyncIteration(ticker, false) + syncer.completeIteration(ticker, false) syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) continue } if !lastUpdateTimestamp.After(*syncer.lastUpdateTimestamp) { // sync only if something has changed - syncer.completeSyncIteration(ticker, false) + syncer.completeIteration(ticker, false) continue } @@ -105,7 +105,7 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { lastUpdateTimestamp, err = syncer.db.GetBundle(ctx, syncer.dbTableName, syncer.createObjFunc, bundleResult) if err != nil { - syncer.completeSyncIteration(ticker, false) + syncer.completeIteration(ticker, false) syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) continue @@ -114,14 +114,14 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { syncer.lastUpdateTimestamp = lastUpdateTimestamp syncer.syncToTransport(syncer.transportBundleKey, datatypes.SpecBundle, lastUpdateTimestamp, bundleResult) - syncer.completeSyncIteration(ticker, true) + syncer.completeIteration(ticker, true) } } } -// completeSyncIteration notifies policy whether sync was actually performed or skipped and resets ticker's interval +// completeIteration notifies policy whether sync was actually performed or skipped and resets ticker's interval // to a new recalculated one. -func (syncer *genericDBToTransportSyncer) completeSyncIteration(ticker *time.Ticker, syncPerformed bool) { +func (syncer *genericDBToTransportSyncer) completeIteration(ticker *time.Ticker, syncPerformed bool) { // get current sync interval currentInterval := syncer.intervalPolicy.GetInterval() From 9b295033d9d01ddbf81615a1846bd0469af9fcab Mon Sep 17 00:00:00 2001 From: olegsternberg Date: Mon, 8 Nov 2021 08:37:41 +0200 Subject: [PATCH 08/15] * PR review fixes --- .../dbsyncer/generic_db_to_transport_syncer.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go b/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go index 0683bf4..c65eddc 100644 --- a/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go @@ -87,14 +87,14 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { case <-ticker.C: lastUpdateTimestamp, err := syncer.db.GetLastUpdateTimestamp(ctx, syncer.dbTableName) if err != nil { - syncer.completeIteration(ticker, false) + syncer.completeSync(ticker, false) syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) continue } if !lastUpdateTimestamp.After(*syncer.lastUpdateTimestamp) { // sync only if something has changed - syncer.completeIteration(ticker, false) + syncer.completeSync(ticker, false) continue } @@ -105,7 +105,7 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { lastUpdateTimestamp, err = syncer.db.GetBundle(ctx, syncer.dbTableName, syncer.createObjFunc, bundleResult) if err != nil { - syncer.completeIteration(ticker, false) + syncer.completeSync(ticker, false) syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) continue @@ -114,14 +114,14 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { syncer.lastUpdateTimestamp = lastUpdateTimestamp syncer.syncToTransport(syncer.transportBundleKey, datatypes.SpecBundle, lastUpdateTimestamp, bundleResult) - syncer.completeIteration(ticker, true) + syncer.completeSync(ticker, true) } } } -// completeIteration notifies policy whether sync was actually performed or skipped and resets ticker's interval +// completeSync notifies policy whether sync was actually performed or skipped and resets ticker's interval // to a new recalculated one. -func (syncer *genericDBToTransportSyncer) completeIteration(ticker *time.Ticker, syncPerformed bool) { +func (syncer *genericDBToTransportSyncer) completeSync(ticker *time.Ticker, syncPerformed bool) { // get current sync interval currentInterval := syncer.intervalPolicy.GetInterval() From 9ab96585ec92ccc29b3d5f9a3a7aeb5116951dd3 Mon Sep 17 00:00:00 2001 From: olegsternberg Date: Thu, 11 Nov 2021 10:27:28 +0200 Subject: [PATCH 09/15] * PR review fixes --- go.mod | 1 + go.sum | 3 + .../generic_db_to_transport_syncer.go | 104 +++++++++--------- .../hoh_config_db_to_transport_syncer.go | 4 +- .../default_syncer_interval_policy.go | 53 --------- .../intervalpolicy/syncer_interval_policy.go | 10 -- ...lacementbindings_db_to_transport_syncer.go | 4 +- .../placementrules_db_to_transport_syncer.go | 4 +- .../policies_db_to_transport_syncer.go | 4 +- .../exponential_backoff_interval_policy.go | 38 +++++++ pkg/intervalpolicy/interval_policy.go | 10 ++ 11 files changed, 111 insertions(+), 124 deletions(-) delete mode 100644 pkg/controller/dbsyncer/intervalpolicy/default_syncer_interval_policy.go delete mode 100644 pkg/controller/dbsyncer/intervalpolicy/syncer_interval_policy.go create mode 100644 pkg/intervalpolicy/exponential_backoff_interval_policy.go create mode 100644 pkg/intervalpolicy/interval_policy.go diff --git a/go.mod b/go.mod index 25f6565..5ade2e8 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge go 1.16 require ( + github.com/cenkalti/backoff/v4 v4.1.1 github.com/go-logr/logr v0.2.1 github.com/go-logr/zapr v0.2.0 // indirect github.com/jackc/pgx/v4 v4.11.0 diff --git a/go.sum b/go.sum index 7433817..e49a01c 100644 --- a/go.sum +++ b/go.sum @@ -147,7 +147,10 @@ github.com/bugsnag/panicwrap v1.2.0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywR github.com/campoy/embedmd v1.0.0/go.mod h1:oxyr9RCiSXg0M3VJ3ks0UGfp98BpSSGr0kpiX3MzVl8= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v0.0.0-20181003080854-62661b46c409/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ= +github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v0.0.0-20181017004759-096ff4a8a059/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= diff --git a/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go b/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go index c65eddc..0dc66b6 100644 --- a/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go @@ -9,8 +9,8 @@ import ( "github.com/go-logr/logr" datatypes "github.com/open-cluster-management/hub-of-hubs-data-types" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle" - "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/controller/dbsyncer/intervalpolicy" hohDb "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db" + "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport" ) @@ -27,16 +27,16 @@ type genericDBToTransportSyncer struct { lastUpdateTimestamp *time.Time createObjFunc bundle.CreateObjectFunction createBundleFunc bundle.CreateBundleFunction - intervalPolicy intervalpolicy.SyncerIntervalPolicy + intervalPolicy intervalpolicy.IntervalPolicy } func (syncer *genericDBToTransportSyncer) Start(stopChannel <-chan struct{}) error { ctx, cancelContext := context.WithCancel(context.Background()) defer cancelContext() - syncer.init() + syncer.init(ctx) - go syncer.syncBundle(ctx) + go syncer.periodicSync(ctx) <-stopChannel // blocking wait for stop event cancelContext() @@ -45,7 +45,7 @@ func (syncer *genericDBToTransportSyncer) Start(stopChannel <-chan struct{}) err return nil } -func (syncer *genericDBToTransportSyncer) init() { +func (syncer *genericDBToTransportSyncer) init(ctx context.Context) { // on initialization, we initialize the lastUpdateTimestamp from the transport layer, as this is the last timestamp // that transport bridge sent an update. // later, in SyncBundle, it will check the db if there are newer updates and if yes it will send it with @@ -59,6 +59,7 @@ func (syncer *genericDBToTransportSyncer) init() { } syncer.log.Info("initialized syncer", "table", fmt.Sprintf("spec.%s", syncer.dbTableName)) + syncer.syncBundle(ctx) } func (syncer *genericDBToTransportSyncer) initLastUpdateTimestampFromTransport() *time.Time { @@ -75,7 +76,37 @@ func (syncer *genericDBToTransportSyncer) initLastUpdateTimestampFromTransport() return ×tamp } -func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { +// syncBundle performs the actual sync logic and returns true if bundle was committed to transport, otherwise false. +func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) bool { + lastUpdateTimestamp, err := syncer.db.GetLastUpdateTimestamp(ctx, syncer.dbTableName) + if err != nil { + syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) + + return false + } + + if !lastUpdateTimestamp.After(*syncer.lastUpdateTimestamp) { // sync only if something has changed + return false + } + + // if we got here, then the last update timestamp from db is after what we have in memory. + // this means something has changed in db, syncing to transport. + bundleResult := syncer.createBundleFunc() + lastUpdateTimestamp, err = syncer.db.GetBundle(ctx, syncer.dbTableName, syncer.createObjFunc, bundleResult) + + if err != nil { + syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) + + return false + } + + syncer.lastUpdateTimestamp = lastUpdateTimestamp + syncer.syncToTransport(syncer.transportBundleKey, datatypes.SpecBundle, lastUpdateTimestamp, bundleResult) + + return true +} + +func (syncer *genericDBToTransportSyncer) periodicSync(ctx context.Context) { ticker := time.NewTicker(syncer.intervalPolicy.GetInterval()) for { @@ -85,63 +116,30 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { return case <-ticker.C: - lastUpdateTimestamp, err := syncer.db.GetLastUpdateTimestamp(ctx, syncer.dbTableName) - if err != nil { - syncer.completeSync(ticker, false) - syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) + synced := syncer.syncBundle(ctx) - continue - } - - if !lastUpdateTimestamp.After(*syncer.lastUpdateTimestamp) { // sync only if something has changed - syncer.completeSync(ticker, false) + // get current sync interval + currentInterval := syncer.intervalPolicy.GetInterval() - continue + // notify policy whether sync was actually performed or skipped + if synced { + syncer.intervalPolicy.Evaluate() + } else { + syncer.intervalPolicy.Reset() } - // if we got here, then the last update timestamp from db is after what we have in memory. - // this means something has changed in db, syncing to transport. - bundleResult := syncer.createBundleFunc() - lastUpdateTimestamp, err = syncer.db.GetBundle(ctx, syncer.dbTableName, syncer.createObjFunc, bundleResult) + // get reevaluated sync interval + reevaluatedInterval := syncer.intervalPolicy.GetInterval() - if err != nil { - syncer.completeSync(ticker, false) - syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) - - continue + // reset ticker if needed + if currentInterval != reevaluatedInterval { + ticker.Reset(reevaluatedInterval) + syncer.log.Info(fmt.Sprintf("sync interval has been reset to %s", reevaluatedInterval.String())) } - - syncer.lastUpdateTimestamp = lastUpdateTimestamp - - syncer.syncToTransport(syncer.transportBundleKey, datatypes.SpecBundle, lastUpdateTimestamp, bundleResult) - syncer.completeSync(ticker, true) } } } -// completeSync notifies policy whether sync was actually performed or skipped and resets ticker's interval -// to a new recalculated one. -func (syncer *genericDBToTransportSyncer) completeSync(ticker *time.Ticker, syncPerformed bool) { - // get current sync interval - currentInterval := syncer.intervalPolicy.GetInterval() - - // notify policy whether sync was actually performed or skipped - if syncPerformed { - syncer.intervalPolicy.OnSyncPerformed() - } else { - syncer.intervalPolicy.OnSyncSkipped() - } - - // get recalculated sync interval - recalculatedInterval := syncer.intervalPolicy.GetInterval() - - ticker.Reset(recalculatedInterval) - - if currentInterval != recalculatedInterval { - syncer.log.Info(fmt.Sprintf("sync interval has been reset to %s", recalculatedInterval.String())) - } -} - func (syncer *genericDBToTransportSyncer) syncToTransport(id string, objType string, timestamp *time.Time, payload bundle.Bundle) { payloadBytes, err := json.Marshal(payload) diff --git a/pkg/controller/dbsyncer/hoh_config_db_to_transport_syncer.go b/pkg/controller/dbsyncer/hoh_config_db_to_transport_syncer.go index 37d4a0f..b35f8ca 100644 --- a/pkg/controller/dbsyncer/hoh_config_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/hoh_config_db_to_transport_syncer.go @@ -6,8 +6,8 @@ import ( configv1 "github.com/open-cluster-management/hub-of-hubs-data-types/apis/config/v1" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle" - "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/controller/dbsyncer/intervalpolicy" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db" + "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -29,7 +29,7 @@ func AddHoHConfigDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, tr transportBundleKey: configsMsgKey, createObjFunc: func() metav1.Object { return &configv1.Config{} }, createBundleFunc: bundle.NewBaseBundle, - intervalPolicy: intervalpolicy.NewDefaultSyncerIntervalPolicy(syncInterval), + intervalPolicy: intervalpolicy.NewExponentialBackoffIntervalPolicy(syncInterval), }); err != nil { return fmt.Errorf("failed to add db to transport syncer - %w", err) } diff --git a/pkg/controller/dbsyncer/intervalpolicy/default_syncer_interval_policy.go b/pkg/controller/dbsyncer/intervalpolicy/default_syncer_interval_policy.go deleted file mode 100644 index b766b99..0000000 --- a/pkg/controller/dbsyncer/intervalpolicy/default_syncer_interval_policy.go +++ /dev/null @@ -1,53 +0,0 @@ -package intervalpolicy - -import ( - "time" -) - -const ( - maxNumOfConsecutiveSyncs = 3 - maxNumOfIncrements = 3 - floatingFactor = 2 -) - -// defaultSyncerIntervalPolicy is a default syncer "floating" interval policy. -type defaultSyncerIntervalPolicy struct { - originalInterval time.Duration - floatingInterval time.Duration - numOfConsecutiveSyncs int64 - numOfIncrements int64 -} - -// NewDefaultSyncerIntervalPolicy creates new default syncer interval policy. -func NewDefaultSyncerIntervalPolicy(interval time.Duration) SyncerIntervalPolicy { - return &defaultSyncerIntervalPolicy{originalInterval: interval, floatingInterval: interval} -} - -// OnSyncPerformed recalculates floating interval if sync happened. -func (policy *defaultSyncerIntervalPolicy) OnSyncPerformed() { - // we've reached maximum number of interval's increments due to consecutive syncs - there is nothing to do - if policy.numOfIncrements == maxNumOfIncrements { - return - } - - policy.numOfConsecutiveSyncs++ - - // we've reached maximum number of consecutive syncs - increment the interval - if policy.numOfConsecutiveSyncs == maxNumOfConsecutiveSyncs { - policy.floatingInterval *= floatingFactor - policy.numOfConsecutiveSyncs = 0 - policy.numOfIncrements++ - } -} - -// OnSyncSkipped resets the entire state of the policy if sync was skipped. -func (policy *defaultSyncerIntervalPolicy) OnSyncSkipped() { - policy.numOfConsecutiveSyncs = 0 - policy.numOfIncrements = 0 - policy.floatingInterval = policy.originalInterval -} - -// GetInterval returns recalculated interval bases on the received events. -func (policy *defaultSyncerIntervalPolicy) GetInterval() time.Duration { - return policy.floatingInterval -} diff --git a/pkg/controller/dbsyncer/intervalpolicy/syncer_interval_policy.go b/pkg/controller/dbsyncer/intervalpolicy/syncer_interval_policy.go deleted file mode 100644 index d26ca8e..0000000 --- a/pkg/controller/dbsyncer/intervalpolicy/syncer_interval_policy.go +++ /dev/null @@ -1,10 +0,0 @@ -package intervalpolicy - -import "time" - -// SyncerIntervalPolicy defines a policy to return syncer interval based on the received events. -type SyncerIntervalPolicy interface { - OnSyncPerformed() - OnSyncSkipped() - GetInterval() time.Duration -} diff --git a/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go b/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go index 76bf724..4fbab60 100644 --- a/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go @@ -6,8 +6,8 @@ import ( policiesv1 "github.com/open-cluster-management/governance-policy-propagator/pkg/apis/policy/v1" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle" - "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/controller/dbsyncer/intervalpolicy" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db" + "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -29,7 +29,7 @@ func AddPlacementBindingsDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSp transportBundleKey: placementBindingsMsgKey, createObjFunc: func() metav1.Object { return &policiesv1.PlacementBinding{} }, createBundleFunc: bundle.NewPlacementBindingBundle, - intervalPolicy: intervalpolicy.NewDefaultSyncerIntervalPolicy(syncInterval), + intervalPolicy: intervalpolicy.NewExponentialBackoffIntervalPolicy(syncInterval), }); err != nil { return fmt.Errorf("failed to add db to transport syncer - %w", err) } diff --git a/pkg/controller/dbsyncer/placementrules_db_to_transport_syncer.go b/pkg/controller/dbsyncer/placementrules_db_to_transport_syncer.go index 8291238..35b7347 100644 --- a/pkg/controller/dbsyncer/placementrules_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/placementrules_db_to_transport_syncer.go @@ -6,8 +6,8 @@ import ( appsv1 "github.com/open-cluster-management/governance-policy-propagator/pkg/apis/apps/v1" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle" - "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/controller/dbsyncer/intervalpolicy" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db" + "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -29,7 +29,7 @@ func AddPlacementRulesDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecD transportBundleKey: placementRulesMsgKey, createObjFunc: func() metav1.Object { return &appsv1.PlacementRule{} }, createBundleFunc: bundle.NewBaseBundle, - intervalPolicy: intervalpolicy.NewDefaultSyncerIntervalPolicy(syncInterval), + intervalPolicy: intervalpolicy.NewExponentialBackoffIntervalPolicy(syncInterval), }); err != nil { return fmt.Errorf("failed to add db to transport syncer - %w", err) } diff --git a/pkg/controller/dbsyncer/policies_db_to_transport_syncer.go b/pkg/controller/dbsyncer/policies_db_to_transport_syncer.go index 3b77e84..6b06a7f 100644 --- a/pkg/controller/dbsyncer/policies_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/policies_db_to_transport_syncer.go @@ -6,8 +6,8 @@ import ( policiesv1 "github.com/open-cluster-management/governance-policy-propagator/pkg/apis/policy/v1" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle" - "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/controller/dbsyncer/intervalpolicy" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db" + "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -29,7 +29,7 @@ func AddPoliciesDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, tra transportBundleKey: policiesMsgKey, createObjFunc: func() metav1.Object { return &policiesv1.Policy{} }, createBundleFunc: bundle.NewBaseBundle, - intervalPolicy: intervalpolicy.NewDefaultSyncerIntervalPolicy(syncInterval), + intervalPolicy: intervalpolicy.NewExponentialBackoffIntervalPolicy(syncInterval), }); err != nil { return fmt.Errorf("failed to add db to transport syncer - %w", err) } diff --git a/pkg/intervalpolicy/exponential_backoff_interval_policy.go b/pkg/intervalpolicy/exponential_backoff_interval_policy.go new file mode 100644 index 0000000..93683e1 --- /dev/null +++ b/pkg/intervalpolicy/exponential_backoff_interval_policy.go @@ -0,0 +1,38 @@ +package intervalpolicy + +import ( + "time" + + "github.com/cenkalti/backoff/v4" +) + +// exponentialBackoffIntervalPolicy is a default interval policy. +type exponentialBackoffIntervalPolicy struct { + exponentialBackoff *backoff.ExponentialBackOff + interval time.Duration +} + +// NewExponentialBackoffIntervalPolicy creates new exponential backoff interval policy. +func NewExponentialBackoffIntervalPolicy(interval time.Duration) IntervalPolicy { + intervalPolicy := &exponentialBackoffIntervalPolicy{exponentialBackoff: backoff.NewExponentialBackOff()} + + intervalPolicy.exponentialBackoff.InitialInterval = interval + intervalPolicy.interval = interval + + return intervalPolicy +} + +// Evaluate reevaluates interval. +func (policy *exponentialBackoffIntervalPolicy) Evaluate() { + policy.interval = policy.exponentialBackoff.NextBackOff() +} + +// Reset resets the entire state of the policy. +func (policy *exponentialBackoffIntervalPolicy) Reset() { + policy.exponentialBackoff.Reset() +} + +// GetInterval returns reevaluated interval. +func (policy *exponentialBackoffIntervalPolicy) GetInterval() time.Duration { + return policy.interval +} diff --git a/pkg/intervalpolicy/interval_policy.go b/pkg/intervalpolicy/interval_policy.go new file mode 100644 index 0000000..fcf1f0e --- /dev/null +++ b/pkg/intervalpolicy/interval_policy.go @@ -0,0 +1,10 @@ +package intervalpolicy + +import "time" + +// IntervalPolicy defines a policy to return interval based on the received events. +type IntervalPolicy interface { + Evaluate() + Reset() + GetInterval() time.Duration +} From 226ff5b322bf30e6d585f6eb1a420486455f4c71 Mon Sep 17 00:00:00 2001 From: olegsternberg Date: Thu, 11 Nov 2021 11:07:49 +0200 Subject: [PATCH 10/15] * PR review fixes --- .../dbsyncer/placementbindings_db_to_transport_syncer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go b/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go index 4fbab60..7e2debc 100644 --- a/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go @@ -22,7 +22,7 @@ const ( func AddPlacementBindingsDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, transport transport.Transport, syncInterval time.Duration) error { if err := mgr.Add(&genericDBToTransportSyncer{ - log: ctrl.Log.WithName("policy-db-to-transport-syncer"), + log: ctrl.Log.WithName("placement-bindings-db-to-transport-syncer"), db: db, dbTableName: placementBindingsTableName, transport: transport, From 7d74905b7c86ebde59a3acfa5e135ce1e4c50e0c Mon Sep 17 00:00:00 2001 From: olegsternberg Date: Thu, 11 Nov 2021 15:15:47 +0200 Subject: [PATCH 11/15] * PR review fixes --- .../exponential_backoff_interval_policy.go | 42 +++++++++++++++---- pkg/intervalpolicy/interval_policy.go | 3 ++ 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/pkg/intervalpolicy/exponential_backoff_interval_policy.go b/pkg/intervalpolicy/exponential_backoff_interval_policy.go index 93683e1..50332e9 100644 --- a/pkg/intervalpolicy/exponential_backoff_interval_policy.go +++ b/pkg/intervalpolicy/exponential_backoff_interval_policy.go @@ -6,30 +6,54 @@ import ( "github.com/cenkalti/backoff/v4" ) +const ( + baseFactor = 2 + maxInterval = 60 * time.Second + maxNumOfConsecutiveEvaluationsBeforeNextBackoff = 3 +) + // exponentialBackoffIntervalPolicy is a default interval policy. type exponentialBackoffIntervalPolicy struct { - exponentialBackoff *backoff.ExponentialBackOff - interval time.Duration + exponentialBackoff *backoff.ExponentialBackOff + interval time.Duration + numOfConsecutiveEvaluations int } // NewExponentialBackoffIntervalPolicy creates new exponential backoff interval policy. func NewExponentialBackoffIntervalPolicy(interval time.Duration) IntervalPolicy { - intervalPolicy := &exponentialBackoffIntervalPolicy{exponentialBackoff: backoff.NewExponentialBackOff()} - - intervalPolicy.exponentialBackoff.InitialInterval = interval - intervalPolicy.interval = interval - - return intervalPolicy + exponentialBackoff := &backoff.ExponentialBackOff{ + InitialInterval: interval, + RandomizationFactor: 0, + Multiplier: baseFactor, + MaxInterval: maxInterval, + MaxElapsedTime: 0, + Stop: 0, + Clock: backoff.SystemClock, + } + + exponentialBackoff.Reset() + + return &exponentialBackoffIntervalPolicy{ + exponentialBackoff: exponentialBackoff, + interval: interval, + } } // Evaluate reevaluates interval. func (policy *exponentialBackoffIntervalPolicy) Evaluate() { - policy.interval = policy.exponentialBackoff.NextBackOff() + policy.numOfConsecutiveEvaluations++ + + if policy.numOfConsecutiveEvaluations == maxNumOfConsecutiveEvaluationsBeforeNextBackoff { + policy.interval = policy.exponentialBackoff.NextBackOff() + policy.numOfConsecutiveEvaluations = 0 + } } // Reset resets the entire state of the policy. func (policy *exponentialBackoffIntervalPolicy) Reset() { policy.exponentialBackoff.Reset() + policy.interval = policy.exponentialBackoff.InitialInterval + policy.numOfConsecutiveEvaluations = 0 } // GetInterval returns reevaluated interval. diff --git a/pkg/intervalpolicy/interval_policy.go b/pkg/intervalpolicy/interval_policy.go index fcf1f0e..74a27c6 100644 --- a/pkg/intervalpolicy/interval_policy.go +++ b/pkg/intervalpolicy/interval_policy.go @@ -4,7 +4,10 @@ import "time" // IntervalPolicy defines a policy to return interval based on the received events. type IntervalPolicy interface { + // Evaluate evaluates next interval. Evaluate() + // Reset resets the interval of the interval policy. Reset() + // GetInterval returns current interval. GetInterval() time.Duration } From fa46f0be0f353c20aabcc2a09564c158204dc9b7 Mon Sep 17 00:00:00 2001 From: olegsternberg Date: Thu, 11 Nov 2021 18:26:05 +0200 Subject: [PATCH 12/15] * PR review fixes --- pkg/intervalpolicy/exponential_backoff_interval_policy.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/intervalpolicy/exponential_backoff_interval_policy.go b/pkg/intervalpolicy/exponential_backoff_interval_policy.go index 50332e9..e614c1c 100644 --- a/pkg/intervalpolicy/exponential_backoff_interval_policy.go +++ b/pkg/intervalpolicy/exponential_backoff_interval_policy.go @@ -34,8 +34,9 @@ func NewExponentialBackoffIntervalPolicy(interval time.Duration) IntervalPolicy exponentialBackoff.Reset() return &exponentialBackoffIntervalPolicy{ - exponentialBackoff: exponentialBackoff, - interval: interval, + exponentialBackoff: exponentialBackoff, + interval: interval, + numOfConsecutiveEvaluations: 0, } } From d24fd916a3144a37c32b5e9248a6b993e54cf5c7 Mon Sep 17 00:00:00 2001 From: olegsternberg Date: Thu, 11 Nov 2021 20:53:32 +0200 Subject: [PATCH 13/15] * use backoff v4.1.2 --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 5ade2e8..c6bda2f 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge go 1.16 require ( - github.com/cenkalti/backoff/v4 v4.1.1 + github.com/cenkalti/backoff/v4 v4.1.2 github.com/go-logr/logr v0.2.1 github.com/go-logr/zapr v0.2.0 // indirect github.com/jackc/pgx/v4 v4.11.0 diff --git a/go.sum b/go.sum index e49a01c..d6901a6 100644 --- a/go.sum +++ b/go.sum @@ -151,6 +151,8 @@ github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEe github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ= github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= +github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= +github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v0.0.0-20181017004759-096ff4a8a059/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= From 10de04993f615df65c4a3993ef64d4ad39c5c90c Mon Sep 17 00:00:00 2001 From: olegsternberg Date: Thu, 11 Nov 2021 21:25:18 +0200 Subject: [PATCH 14/15] * use backoff v4.1.3 with specific commit id to include Nir R changes --- go.mod | 3 ++- go.sum | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index c6bda2f..e602066 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,8 @@ module github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge go 1.16 require ( - github.com/cenkalti/backoff/v4 v4.1.2 + github.com/cenkalti/backoff v2.2.1+incompatible // indirect + github.com/cenkalti/backoff/v4 v4.1.3-0.20211111164109-6b0e4ad0cd65 github.com/go-logr/logr v0.2.1 github.com/go-logr/zapr v0.2.0 // indirect github.com/jackc/pgx/v4 v4.11.0 diff --git a/go.sum b/go.sum index d6901a6..1c77b01 100644 --- a/go.sum +++ b/go.sum @@ -153,6 +153,8 @@ github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1P github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= +github.com/cenkalti/backoff/v4 v4.1.3-0.20211111164109-6b0e4ad0cd65 h1:b1xN+oryGwKBsSdQo2wizfReaiLRhZRzb6BtjLZB/P0= +github.com/cenkalti/backoff/v4 v4.1.3-0.20211111164109-6b0e4ad0cd65/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v0.0.0-20181017004759-096ff4a8a059/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= From 7fd59d3b4aed8cd5a9b114517a416795c9f35b20 Mon Sep 17 00:00:00 2001 From: olegsternberg Date: Thu, 11 Nov 2021 21:28:27 +0200 Subject: [PATCH 15/15] * use backoff v4.1.3 with specific commit id to include Nir R changes --- go.mod | 1 - 1 file changed, 1 deletion(-) diff --git a/go.mod b/go.mod index e602066..be104ce 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,6 @@ module github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge go 1.16 require ( - github.com/cenkalti/backoff v2.2.1+incompatible // indirect github.com/cenkalti/backoff/v4 v4.1.3-0.20211111164109-6b0e4ad0cd65 github.com/go-logr/logr v0.2.1 github.com/go-logr/zapr v0.2.0 // indirect