Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Floating sync intervals #9

Merged
merged 15 commits into from Nov 12, 2021
Merged
3 changes: 2 additions & 1 deletion go.mod
Expand Up @@ -7,7 +7,8 @@ require (
github.com/go-logr/zapr v0.2.0 // indirect
github.com/jackc/pgx/v4 v4.11.0
github.com/open-cluster-management/governance-policy-propagator v0.0.0-20210520203318-a78632de1e26
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.0
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.1-0.20210913120916-7dc589acefb4
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.1-0.20210913120916-7dc589acefb4
github.com/open-horizon/edge-sync-service-client v0.0.0-20190711093406-dc3a19905da2
github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152 // indirect
github.com/operator-framework/operator-sdk v0.19.4
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Expand Up @@ -771,6 +771,12 @@ github.com/open-cluster-management/governance-policy-propagator v0.0.0-202105202
github.com/open-cluster-management/governance-policy-propagator v0.0.0-20210520203318-a78632de1e26/go.mod h1:7gzDyC7PddcuInXS+kLrfYmto/ImSORibXwy8RGjNM8=
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.0 h1:w7DbcYX27W6qCnumAj0rQbvsLVwFY0ggy6ZXpbqPsus=
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.0/go.mod h1:OOI0mxL5BOSq0TN6dVBfHiA6PUeapn+sMBXm9fLNaw8=
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.1-0.20210913120916-7dc589acefb4 h1:UueRf7YhBhHhWgop66CQxosFSQ/cUgNuNSOg8UPfOA0=
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.1-0.20210913120916-7dc589acefb4/go.mod h1:Th2t5lukBfvPb5tvNCMROedinKR4s64sK5hgMt8Er7w=
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.0 h1:qgTStCi+y6/nkFRF+cXHGLK50mGxA7YXB2Ey8BTsmyc=
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.0/go.mod h1:xpGxw+0si9vuDABvcJ9FxjznPw/uAbICzhaX+9JLLho=
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.1-0.20210913120916-7dc589acefb4 h1:BUTr6y9DEgHVT6pVDXfKYfw3pTTJ9aZP/MVbMDUXOOA=
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.1-0.20210913120916-7dc589acefb4/go.mod h1:xpGxw+0si9vuDABvcJ9FxjznPw/uAbICzhaX+9JLLho=
github.com/open-horizon/edge-sync-service-client v0.0.0-20190711093406-dc3a19905da2 h1:ZmX/tuCgFVbe07pqObNWf14cAYpHN8gVRSkWnbtSUW4=
github.com/open-horizon/edge-sync-service-client v0.0.0-20190711093406-dc3a19905da2/go.mod h1:rKW01VtqwtwaUQ+zkJ80X5EeGqsTblQE4J9HxPUtTAQ=
github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152 h1:YEvNOMo3ANOQ3AwsU0cCcBA4nKHDLUlyUCRWk5rBf68=
Expand Down
100 changes: 67 additions & 33 deletions pkg/controller/dbsyncer/generic_db_to_transport_syncer.go
Expand Up @@ -23,47 +23,41 @@ type genericDBToTransportSyncer struct {
dbTableName string
transport transport.Transport
transportBundleKey string
syncInterval time.Duration
lastUpdateTimestamp *time.Time
createObjFunc bundle.CreateObjectFunction
createBundleFunc bundle.CreateBundleFunction
intervalPolicy syncerIntervalPolicy
vMaroon marked this conversation as resolved.
Show resolved Hide resolved
}

func (syncer *genericDBToTransportSyncer) Start(stopChannel <-chan struct{}) error {
ctx, cancelContext := context.WithCancel(context.Background())
defer cancelContext()

syncer.init(ctx)
ticker := time.NewTicker(syncer.syncInterval)
syncer.init()

for {
select {
case <-stopChannel:
ticker.Stop()
cancelContext()
syncer.log.Info("stopped syncer", "table", syncer.dbTableName)
go syncer.syncBundle(ctx)
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved

return nil
case <-ticker.C:
go syncer.syncBundle(ctx)
}
}
<-stopChannel // blocking wait for stop event
vMaroon marked this conversation as resolved.
Show resolved Hide resolved
cancelContext()
syncer.log.Info("stopped syncer", "table", syncer.dbTableName)

return nil
}

func (syncer *genericDBToTransportSyncer) init(ctx context.Context) {
func (syncer *genericDBToTransportSyncer) init() {
// on initialization, we initialize the lastUpdateTimestamp from the transport layer, as this is the last timestamp
// that transport bridge sent an update.
// later, in SyncBundle, it will check the db if there are newer updates and if yes it will send it with
// transport layer and update the lastUpdateTimestamp field accordingly.
timestamp := syncer.initLastUpdateTimestampFromTransport()

if timestamp != nil {
syncer.lastUpdateTimestamp = timestamp
} else {
syncer.lastUpdateTimestamp = &time.Time{}
}

syncer.log.Info("initialzed syncer", "table", fmt.Sprintf("spec.%s", syncer.dbTableName))
syncer.syncBundle(ctx)
syncer.log.Info("initialized syncer", "table", fmt.Sprintf("spec.%s", syncer.dbTableName))
}

func (syncer *genericDBToTransportSyncer) initLastUpdateTimestampFromTransport() *time.Time {
Expand All @@ -81,28 +75,68 @@ func (syncer *genericDBToTransportSyncer) initLastUpdateTimestampFromTransport()
}

func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) {
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved
lastUpdateTimestamp, err := syncer.db.GetLastUpdateTimestamp(ctx, syncer.dbTableName)
if err != nil {
syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName)
return
}
currentSyncInterval := syncer.intervalPolicy.getInterval()
ticker := time.NewTicker(currentSyncInterval)

if !lastUpdateTimestamp.After(*syncer.lastUpdateTimestamp) { // sync only if something has changed
return
for {
select {
case <-ctx.Done(): // we have received a signal to stop
ticker.Stop()
return

case <-ticker.C:
lastUpdateTimestamp, err := syncer.db.GetLastUpdateTimestamp(ctx, syncer.dbTableName)
if err != nil {
syncer.adjustTicker(ticker, &currentSyncInterval, false)
syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName)

continue
}

if !lastUpdateTimestamp.After(*syncer.lastUpdateTimestamp) { // sync only if something has changed
syncer.adjustTicker(ticker, &currentSyncInterval, false)

continue
}

// if we got here, then the last update timestamp from db is after what we have in memory.
// this means something has changed in db, syncing to transport.
bundleResult := syncer.createBundleFunc()
lastUpdateTimestamp, err = syncer.db.GetBundle(ctx, syncer.dbTableName, syncer.createObjFunc, bundleResult)

if err != nil {
syncer.adjustTicker(ticker, &currentSyncInterval, false)
syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName)

continue
}

syncer.lastUpdateTimestamp = lastUpdateTimestamp

syncer.syncToTransport(syncer.transportBundleKey, datatypes.SpecBundle, lastUpdateTimestamp, bundleResult)
syncer.adjustTicker(ticker, &currentSyncInterval, true)
}
vMaroon marked this conversation as resolved.
Show resolved Hide resolved
}
// if we got here, then the last update timestamp from db is after what we have in memory.
// this means something has changed in db, syncing to transport.
bundleResult := syncer.createBundleFunc()
lastUpdateTimestamp, err = syncer.db.GetBundle(ctx, syncer.dbTableName, syncer.createObjFunc, bundleResult)
}

if err != nil {
syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName)
return
func (syncer *genericDBToTransportSyncer) adjustTicker(ticker *time.Ticker, currentSyncInterval *time.Duration,
syncPerformed bool) {
// notify policy whether sync was actually performed or skipped
vMaroon marked this conversation as resolved.
Show resolved Hide resolved
if syncPerformed {
syncer.intervalPolicy.onSyncPerformed()
} else {
syncer.intervalPolicy.onSyncSkipped()
}

syncer.lastUpdateTimestamp = lastUpdateTimestamp
// get recalculated sync interval
interval := syncer.intervalPolicy.getInterval()

syncer.syncToTransport(syncer.transportBundleKey, datatypes.SpecBundle, lastUpdateTimestamp, bundleResult)
// reset ticker if sync interval has changed
if interval != *currentSyncInterval {
*currentSyncInterval = interval
ticker.Reset(*currentSyncInterval)
syncer.log.Info(fmt.Sprintf("sync interval has been reset to %s", currentSyncInterval.String()))
}
}

func (syncer *genericDBToTransportSyncer) syncToTransport(id string, objType string, timestamp *time.Time,
Expand Down
Expand Up @@ -26,9 +26,9 @@ func AddHoHConfigDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, tr
dbTableName: configTableName,
transport: transport,
transportBundleKey: configsMsgKey,
syncInterval: syncInterval,
createObjFunc: func() metav1.Object { return &configv1.Config{} },
createBundleFunc: bundle.NewBaseBundle,
intervalPolicy: newDefaultSyncerIntervalPolicy(syncInterval),
}); err != nil {
return fmt.Errorf("failed to add db to transport syncer - %w", err)
}
Expand Down
Expand Up @@ -26,9 +26,9 @@ func AddPlacementBindingsDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSp
dbTableName: placementBindingsTableName,
transport: transport,
transportBundleKey: placementBindingsMsgKey,
syncInterval: syncInterval,
createObjFunc: func() metav1.Object { return &policiesv1.PlacementBinding{} },
createBundleFunc: bundle.NewPlacementBindingBundle,
intervalPolicy: newDefaultSyncerIntervalPolicy(syncInterval),
}); err != nil {
return fmt.Errorf("failed to add db to transport syncer - %w", err)
}
Expand Down
Expand Up @@ -26,9 +26,9 @@ func AddPlacementRulesDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecD
dbTableName: placementRulesTableName,
transport: transport,
transportBundleKey: placementRulesMsgKey,
syncInterval: syncInterval,
createObjFunc: func() metav1.Object { return &appsv1.PlacementRule{} },
createBundleFunc: bundle.NewBaseBundle,
intervalPolicy: newDefaultSyncerIntervalPolicy(syncInterval),
}); err != nil {
return fmt.Errorf("failed to add db to transport syncer - %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/dbsyncer/policies_db_to_transport_syncer.go
Expand Up @@ -26,9 +26,9 @@ func AddPoliciesDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, tra
dbTableName: policiesTableName,
transport: transport,
transportBundleKey: policiesMsgKey,
syncInterval: syncInterval,
createObjFunc: func() metav1.Object { return &policiesv1.Policy{} },
createBundleFunc: bundle.NewBaseBundle,
intervalPolicy: newDefaultSyncerIntervalPolicy(syncInterval),
}); err != nil {
return fmt.Errorf("failed to add db to transport syncer - %w", err)
}
Expand Down
58 changes: 58 additions & 0 deletions pkg/controller/dbsyncer/syncer_interval_policy.go
@@ -0,0 +1,58 @@
package dbsyncer

import (
"time"
)

const (
maxNumOfConsecutiveSyncs = 3
maxNumOfIncrements = 3
floatingFactor = 2
)

// syncerIntervalPolicy defines a policy to return syncer "floating" interval.
type syncerIntervalPolicy interface {
vMaroon marked this conversation as resolved.
Show resolved Hide resolved
onSyncPerformed()
onSyncSkipped()
getInterval() time.Duration
}

// defaultSyncerIntervalPolicy is a default syncer "floating" interval policy.
type defaultSyncerIntervalPolicy struct {
vMaroon marked this conversation as resolved.
Show resolved Hide resolved
originalInterval time.Duration
floatingInterval time.Duration
numOfConsecutiveSyncs int64
vMaroon marked this conversation as resolved.
Show resolved Hide resolved
numOfIncrements int64
}

func newDefaultSyncerIntervalPolicy(interval time.Duration) *defaultSyncerIntervalPolicy {
return &defaultSyncerIntervalPolicy{originalInterval: interval, floatingInterval: interval}
}

// onSyncPerformed recalculates floating interval if sync happened.
func (policy *defaultSyncerIntervalPolicy) onSyncPerformed() {
// we've reached maximum number of interval's increments due to consecutive syncs - there is nothing to do
if policy.numOfIncrements == maxNumOfIncrements {
return
}

policy.numOfConsecutiveSyncs++

// we've reached maximum number of consecutive sync - increment the interval
if policy.numOfConsecutiveSyncs == maxNumOfConsecutiveSyncs {
policy.floatingInterval *= floatingFactor
policy.numOfConsecutiveSyncs = 0
policy.numOfIncrements++
}
}

// onSyncSkipped resets the entire state of the policy if sync was skipped.
func (policy *defaultSyncerIntervalPolicy) onSyncSkipped() {
policy.numOfConsecutiveSyncs = 0
policy.numOfIncrements = 0
policy.floatingInterval = policy.originalInterval
}

func (policy *defaultSyncerIntervalPolicy) getInterval() time.Duration {
return policy.floatingInterval
}