diff --git a/go.mod b/go.mod index 3cf793f..be104ce 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,13 @@ module github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge go 1.16 require ( + github.com/cenkalti/backoff/v4 v4.1.3-0.20211111164109-6b0e4ad0cd65 github.com/go-logr/logr v0.2.1 github.com/go-logr/zapr v0.2.0 // indirect github.com/jackc/pgx/v4 v4.11.0 github.com/open-cluster-management/governance-policy-propagator v0.0.0-20210520203318-a78632de1e26 - github.com/open-cluster-management/hub-of-hubs-data-types v0.1.0 + github.com/open-cluster-management/hub-of-hubs-data-types v0.1.1-0.20210913120916-7dc589acefb4 + github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.1-0.20210913120916-7dc589acefb4 github.com/open-horizon/edge-sync-service-client v0.0.0-20190711093406-dc3a19905da2 github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152 // indirect github.com/operator-framework/operator-sdk v0.19.4 diff --git a/go.sum b/go.sum index 9722f70..1c77b01 100644 --- a/go.sum +++ b/go.sum @@ -147,7 +147,14 @@ github.com/bugsnag/panicwrap v1.2.0/go.mod h1:D/8v3kj0zr8ZAKg1AQ6crr+5VwKN5eIywR github.com/campoy/embedmd v1.0.0/go.mod h1:oxyr9RCiSXg0M3VJ3ks0UGfp98BpSSGr0kpiX3MzVl8= github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ= github.com/cenkalti/backoff v0.0.0-20181003080854-62661b46c409/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ= +github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= +github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= +github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= +github.com/cenkalti/backoff/v4 v4.1.3-0.20211111164109-6b0e4ad0cd65 h1:b1xN+oryGwKBsSdQo2wizfReaiLRhZRzb6BtjLZB/P0= +github.com/cenkalti/backoff/v4 v4.1.3-0.20211111164109-6b0e4ad0cd65/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v0.0.0-20181017004759-096ff4a8a059/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= @@ -771,6 +778,12 @@ github.com/open-cluster-management/governance-policy-propagator v0.0.0-202105202 github.com/open-cluster-management/governance-policy-propagator v0.0.0-20210520203318-a78632de1e26/go.mod h1:7gzDyC7PddcuInXS+kLrfYmto/ImSORibXwy8RGjNM8= github.com/open-cluster-management/hub-of-hubs-data-types v0.1.0 h1:w7DbcYX27W6qCnumAj0rQbvsLVwFY0ggy6ZXpbqPsus= github.com/open-cluster-management/hub-of-hubs-data-types v0.1.0/go.mod h1:OOI0mxL5BOSq0TN6dVBfHiA6PUeapn+sMBXm9fLNaw8= +github.com/open-cluster-management/hub-of-hubs-data-types v0.1.1-0.20210913120916-7dc589acefb4 h1:UueRf7YhBhHhWgop66CQxosFSQ/cUgNuNSOg8UPfOA0= +github.com/open-cluster-management/hub-of-hubs-data-types v0.1.1-0.20210913120916-7dc589acefb4/go.mod h1:Th2t5lukBfvPb5tvNCMROedinKR4s64sK5hgMt8Er7w= +github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.0 h1:qgTStCi+y6/nkFRF+cXHGLK50mGxA7YXB2Ey8BTsmyc= +github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.0/go.mod h1:xpGxw+0si9vuDABvcJ9FxjznPw/uAbICzhaX+9JLLho= +github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.1-0.20210913120916-7dc589acefb4 h1:BUTr6y9DEgHVT6pVDXfKYfw3pTTJ9aZP/MVbMDUXOOA= +github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.1-0.20210913120916-7dc589acefb4/go.mod h1:xpGxw+0si9vuDABvcJ9FxjznPw/uAbICzhaX+9JLLho= github.com/open-horizon/edge-sync-service-client v0.0.0-20190711093406-dc3a19905da2 h1:ZmX/tuCgFVbe07pqObNWf14cAYpHN8gVRSkWnbtSUW4= github.com/open-horizon/edge-sync-service-client v0.0.0-20190711093406-dc3a19905da2/go.mod h1:rKW01VtqwtwaUQ+zkJ80X5EeGqsTblQE4J9HxPUtTAQ= github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152 h1:YEvNOMo3ANOQ3AwsU0cCcBA4nKHDLUlyUCRWk5rBf68= diff --git a/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go b/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go index 81667ee..0dc66b6 100644 --- a/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/generic_db_to_transport_syncer.go @@ -10,6 +10,7 @@ import ( datatypes "github.com/open-cluster-management/hub-of-hubs-data-types" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle" hohDb "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db" + "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport" ) @@ -23,10 +24,10 @@ type genericDBToTransportSyncer struct { dbTableName string transport transport.Transport transportBundleKey string - syncInterval time.Duration lastUpdateTimestamp *time.Time createObjFunc bundle.CreateObjectFunction createBundleFunc bundle.CreateBundleFunction + intervalPolicy intervalpolicy.IntervalPolicy } func (syncer *genericDBToTransportSyncer) Start(stopChannel <-chan struct{}) error { @@ -34,20 +35,14 @@ func (syncer *genericDBToTransportSyncer) Start(stopChannel <-chan struct{}) err defer cancelContext() syncer.init(ctx) - ticker := time.NewTicker(syncer.syncInterval) - for { - select { - case <-stopChannel: - ticker.Stop() - cancelContext() - syncer.log.Info("stopped syncer", "table", syncer.dbTableName) + go syncer.periodicSync(ctx) - return nil - case <-ticker.C: - go syncer.syncBundle(ctx) - } - } + <-stopChannel // blocking wait for stop event + cancelContext() + syncer.log.Info("stopped syncer", "table", syncer.dbTableName) + + return nil } func (syncer *genericDBToTransportSyncer) init(ctx context.Context) { @@ -56,13 +51,14 @@ func (syncer *genericDBToTransportSyncer) init(ctx context.Context) { // later, in SyncBundle, it will check the db if there are newer updates and if yes it will send it with // transport layer and update the lastUpdateTimestamp field accordingly. timestamp := syncer.initLastUpdateTimestampFromTransport() + if timestamp != nil { syncer.lastUpdateTimestamp = timestamp } else { syncer.lastUpdateTimestamp = &time.Time{} } - syncer.log.Info("initialzed syncer", "table", fmt.Sprintf("spec.%s", syncer.dbTableName)) + syncer.log.Info("initialized syncer", "table", fmt.Sprintf("spec.%s", syncer.dbTableName)) syncer.syncBundle(ctx) } @@ -80,16 +76,19 @@ func (syncer *genericDBToTransportSyncer) initLastUpdateTimestampFromTransport() return ×tamp } -func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { +// syncBundle performs the actual sync logic and returns true if bundle was committed to transport, otherwise false. +func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) bool { lastUpdateTimestamp, err := syncer.db.GetLastUpdateTimestamp(ctx, syncer.dbTableName) if err != nil { syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) - return + + return false } if !lastUpdateTimestamp.After(*syncer.lastUpdateTimestamp) { // sync only if something has changed - return + return false } + // if we got here, then the last update timestamp from db is after what we have in memory. // this means something has changed in db, syncing to transport. bundleResult := syncer.createBundleFunc() @@ -97,12 +96,48 @@ func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) { if err != nil { syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName) - return + + return false } syncer.lastUpdateTimestamp = lastUpdateTimestamp - syncer.syncToTransport(syncer.transportBundleKey, datatypes.SpecBundle, lastUpdateTimestamp, bundleResult) + + return true +} + +func (syncer *genericDBToTransportSyncer) periodicSync(ctx context.Context) { + ticker := time.NewTicker(syncer.intervalPolicy.GetInterval()) + + for { + select { + case <-ctx.Done(): // we have received a signal to stop + ticker.Stop() + return + + case <-ticker.C: + synced := syncer.syncBundle(ctx) + + // get current sync interval + currentInterval := syncer.intervalPolicy.GetInterval() + + // notify policy whether sync was actually performed or skipped + if synced { + syncer.intervalPolicy.Evaluate() + } else { + syncer.intervalPolicy.Reset() + } + + // get reevaluated sync interval + reevaluatedInterval := syncer.intervalPolicy.GetInterval() + + // reset ticker if needed + if currentInterval != reevaluatedInterval { + ticker.Reset(reevaluatedInterval) + syncer.log.Info(fmt.Sprintf("sync interval has been reset to %s", reevaluatedInterval.String())) + } + } + } } func (syncer *genericDBToTransportSyncer) syncToTransport(id string, objType string, timestamp *time.Time, diff --git a/pkg/controller/dbsyncer/hoh_config_db_to_transport_syncer.go b/pkg/controller/dbsyncer/hoh_config_db_to_transport_syncer.go index 0a22277..b35f8ca 100644 --- a/pkg/controller/dbsyncer/hoh_config_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/hoh_config_db_to_transport_syncer.go @@ -7,6 +7,7 @@ import ( configv1 "github.com/open-cluster-management/hub-of-hubs-data-types/apis/config/v1" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db" + "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -26,9 +27,9 @@ func AddHoHConfigDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, tr dbTableName: configTableName, transport: transport, transportBundleKey: configsMsgKey, - syncInterval: syncInterval, createObjFunc: func() metav1.Object { return &configv1.Config{} }, createBundleFunc: bundle.NewBaseBundle, + intervalPolicy: intervalpolicy.NewExponentialBackoffIntervalPolicy(syncInterval), }); err != nil { return fmt.Errorf("failed to add db to transport syncer - %w", err) } diff --git a/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go b/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go index 5fbc2fa..7e2debc 100644 --- a/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/placementbindings_db_to_transport_syncer.go @@ -7,6 +7,7 @@ import ( policiesv1 "github.com/open-cluster-management/governance-policy-propagator/pkg/apis/policy/v1" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db" + "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -21,14 +22,14 @@ const ( func AddPlacementBindingsDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, transport transport.Transport, syncInterval time.Duration) error { if err := mgr.Add(&genericDBToTransportSyncer{ - log: ctrl.Log.WithName("policy-db-to-transport-syncer"), + log: ctrl.Log.WithName("placement-bindings-db-to-transport-syncer"), db: db, dbTableName: placementBindingsTableName, transport: transport, transportBundleKey: placementBindingsMsgKey, - syncInterval: syncInterval, createObjFunc: func() metav1.Object { return &policiesv1.PlacementBinding{} }, createBundleFunc: bundle.NewPlacementBindingBundle, + intervalPolicy: intervalpolicy.NewExponentialBackoffIntervalPolicy(syncInterval), }); err != nil { return fmt.Errorf("failed to add db to transport syncer - %w", err) } diff --git a/pkg/controller/dbsyncer/placementrules_db_to_transport_syncer.go b/pkg/controller/dbsyncer/placementrules_db_to_transport_syncer.go index 21f6430..35b7347 100644 --- a/pkg/controller/dbsyncer/placementrules_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/placementrules_db_to_transport_syncer.go @@ -7,6 +7,7 @@ import ( appsv1 "github.com/open-cluster-management/governance-policy-propagator/pkg/apis/apps/v1" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db" + "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -26,9 +27,9 @@ func AddPlacementRulesDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecD dbTableName: placementRulesTableName, transport: transport, transportBundleKey: placementRulesMsgKey, - syncInterval: syncInterval, createObjFunc: func() metav1.Object { return &appsv1.PlacementRule{} }, createBundleFunc: bundle.NewBaseBundle, + intervalPolicy: intervalpolicy.NewExponentialBackoffIntervalPolicy(syncInterval), }); err != nil { return fmt.Errorf("failed to add db to transport syncer - %w", err) } diff --git a/pkg/controller/dbsyncer/policies_db_to_transport_syncer.go b/pkg/controller/dbsyncer/policies_db_to_transport_syncer.go index 24aa333..6b06a7f 100644 --- a/pkg/controller/dbsyncer/policies_db_to_transport_syncer.go +++ b/pkg/controller/dbsyncer/policies_db_to_transport_syncer.go @@ -7,6 +7,7 @@ import ( policiesv1 "github.com/open-cluster-management/governance-policy-propagator/pkg/apis/policy/v1" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db" + "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy" "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ctrl "sigs.k8s.io/controller-runtime" @@ -26,9 +27,9 @@ func AddPoliciesDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, tra dbTableName: policiesTableName, transport: transport, transportBundleKey: policiesMsgKey, - syncInterval: syncInterval, createObjFunc: func() metav1.Object { return &policiesv1.Policy{} }, createBundleFunc: bundle.NewBaseBundle, + intervalPolicy: intervalpolicy.NewExponentialBackoffIntervalPolicy(syncInterval), }); err != nil { return fmt.Errorf("failed to add db to transport syncer - %w", err) } diff --git a/pkg/intervalpolicy/exponential_backoff_interval_policy.go b/pkg/intervalpolicy/exponential_backoff_interval_policy.go new file mode 100644 index 0000000..e614c1c --- /dev/null +++ b/pkg/intervalpolicy/exponential_backoff_interval_policy.go @@ -0,0 +1,63 @@ +package intervalpolicy + +import ( + "time" + + "github.com/cenkalti/backoff/v4" +) + +const ( + baseFactor = 2 + maxInterval = 60 * time.Second + maxNumOfConsecutiveEvaluationsBeforeNextBackoff = 3 +) + +// exponentialBackoffIntervalPolicy is a default interval policy. +type exponentialBackoffIntervalPolicy struct { + exponentialBackoff *backoff.ExponentialBackOff + interval time.Duration + numOfConsecutiveEvaluations int +} + +// NewExponentialBackoffIntervalPolicy creates new exponential backoff interval policy. +func NewExponentialBackoffIntervalPolicy(interval time.Duration) IntervalPolicy { + exponentialBackoff := &backoff.ExponentialBackOff{ + InitialInterval: interval, + RandomizationFactor: 0, + Multiplier: baseFactor, + MaxInterval: maxInterval, + MaxElapsedTime: 0, + Stop: 0, + Clock: backoff.SystemClock, + } + + exponentialBackoff.Reset() + + return &exponentialBackoffIntervalPolicy{ + exponentialBackoff: exponentialBackoff, + interval: interval, + numOfConsecutiveEvaluations: 0, + } +} + +// Evaluate reevaluates interval. +func (policy *exponentialBackoffIntervalPolicy) Evaluate() { + policy.numOfConsecutiveEvaluations++ + + if policy.numOfConsecutiveEvaluations == maxNumOfConsecutiveEvaluationsBeforeNextBackoff { + policy.interval = policy.exponentialBackoff.NextBackOff() + policy.numOfConsecutiveEvaluations = 0 + } +} + +// Reset resets the entire state of the policy. +func (policy *exponentialBackoffIntervalPolicy) Reset() { + policy.exponentialBackoff.Reset() + policy.interval = policy.exponentialBackoff.InitialInterval + policy.numOfConsecutiveEvaluations = 0 +} + +// GetInterval returns reevaluated interval. +func (policy *exponentialBackoffIntervalPolicy) GetInterval() time.Duration { + return policy.interval +} diff --git a/pkg/intervalpolicy/interval_policy.go b/pkg/intervalpolicy/interval_policy.go new file mode 100644 index 0000000..74a27c6 --- /dev/null +++ b/pkg/intervalpolicy/interval_policy.go @@ -0,0 +1,13 @@ +package intervalpolicy + +import "time" + +// IntervalPolicy defines a policy to return interval based on the received events. +type IntervalPolicy interface { + // Evaluate evaluates next interval. + Evaluate() + // Reset resets the interval of the interval policy. + Reset() + // GetInterval returns current interval. + GetInterval() time.Duration +}