Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Floating sync intervals #9

Merged
merged 15 commits into from Nov 12, 2021
Merged
3 changes: 2 additions & 1 deletion go.mod
Expand Up @@ -7,7 +7,8 @@ require (
github.com/go-logr/zapr v0.2.0 // indirect
github.com/jackc/pgx/v4 v4.11.0
github.com/open-cluster-management/governance-policy-propagator v0.0.0-20210520203318-a78632de1e26
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.0
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.1-0.20210913120916-7dc589acefb4
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.1-0.20210913120916-7dc589acefb4
github.com/open-horizon/edge-sync-service-client v0.0.0-20190711093406-dc3a19905da2
github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152 // indirect
github.com/operator-framework/operator-sdk v0.19.4
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Expand Up @@ -771,6 +771,12 @@ github.com/open-cluster-management/governance-policy-propagator v0.0.0-202105202
github.com/open-cluster-management/governance-policy-propagator v0.0.0-20210520203318-a78632de1e26/go.mod h1:7gzDyC7PddcuInXS+kLrfYmto/ImSORibXwy8RGjNM8=
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.0 h1:w7DbcYX27W6qCnumAj0rQbvsLVwFY0ggy6ZXpbqPsus=
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.0/go.mod h1:OOI0mxL5BOSq0TN6dVBfHiA6PUeapn+sMBXm9fLNaw8=
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.1-0.20210913120916-7dc589acefb4 h1:UueRf7YhBhHhWgop66CQxosFSQ/cUgNuNSOg8UPfOA0=
github.com/open-cluster-management/hub-of-hubs-data-types v0.1.1-0.20210913120916-7dc589acefb4/go.mod h1:Th2t5lukBfvPb5tvNCMROedinKR4s64sK5hgMt8Er7w=
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.0 h1:qgTStCi+y6/nkFRF+cXHGLK50mGxA7YXB2Ey8BTsmyc=
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.0/go.mod h1:xpGxw+0si9vuDABvcJ9FxjznPw/uAbICzhaX+9JLLho=
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.1-0.20210913120916-7dc589acefb4 h1:BUTr6y9DEgHVT6pVDXfKYfw3pTTJ9aZP/MVbMDUXOOA=
github.com/open-cluster-management/hub-of-hubs-data-types/apis/config v0.1.1-0.20210913120916-7dc589acefb4/go.mod h1:xpGxw+0si9vuDABvcJ9FxjznPw/uAbICzhaX+9JLLho=
github.com/open-horizon/edge-sync-service-client v0.0.0-20190711093406-dc3a19905da2 h1:ZmX/tuCgFVbe07pqObNWf14cAYpHN8gVRSkWnbtSUW4=
github.com/open-horizon/edge-sync-service-client v0.0.0-20190711093406-dc3a19905da2/go.mod h1:rKW01VtqwtwaUQ+zkJ80X5EeGqsTblQE4J9HxPUtTAQ=
github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152 h1:YEvNOMo3ANOQ3AwsU0cCcBA4nKHDLUlyUCRWk5rBf68=
Expand Down
103 changes: 70 additions & 33 deletions pkg/controller/dbsyncer/generic_db_to_transport_syncer.go
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-logr/logr"
datatypes "github.com/open-cluster-management/hub-of-hubs-data-types"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/controller/dbsyncer/intervalpolicy"
hohDb "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport"
)
Expand All @@ -23,47 +24,41 @@ type genericDBToTransportSyncer struct {
dbTableName string
transport transport.Transport
transportBundleKey string
syncInterval time.Duration
lastUpdateTimestamp *time.Time
createObjFunc bundle.CreateObjectFunction
createBundleFunc bundle.CreateBundleFunction
intervalPolicy intervalpolicy.SyncerIntervalPolicy
}

func (syncer *genericDBToTransportSyncer) Start(stopChannel <-chan struct{}) error {
ctx, cancelContext := context.WithCancel(context.Background())
defer cancelContext()

syncer.init(ctx)
ticker := time.NewTicker(syncer.syncInterval)
syncer.init()

for {
select {
case <-stopChannel:
ticker.Stop()
cancelContext()
syncer.log.Info("stopped syncer", "table", syncer.dbTableName)
go syncer.syncBundle(ctx)
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved

return nil
case <-ticker.C:
go syncer.syncBundle(ctx)
}
}
<-stopChannel // blocking wait for stop event
vMaroon marked this conversation as resolved.
Show resolved Hide resolved
cancelContext()
syncer.log.Info("stopped syncer", "table", syncer.dbTableName)

return nil
}

func (syncer *genericDBToTransportSyncer) init(ctx context.Context) {
func (syncer *genericDBToTransportSyncer) init() {
// on initialization, we initialize the lastUpdateTimestamp from the transport layer, as this is the last timestamp
// that transport bridge sent an update.
// later, in SyncBundle, it will check the db if there are newer updates and if yes it will send it with
// transport layer and update the lastUpdateTimestamp field accordingly.
timestamp := syncer.initLastUpdateTimestampFromTransport()

if timestamp != nil {
syncer.lastUpdateTimestamp = timestamp
} else {
syncer.lastUpdateTimestamp = &time.Time{}
}

syncer.log.Info("initialzed syncer", "table", fmt.Sprintf("spec.%s", syncer.dbTableName))
syncer.syncBundle(ctx)
syncer.log.Info("initialized syncer", "table", fmt.Sprintf("spec.%s", syncer.dbTableName))
}

func (syncer *genericDBToTransportSyncer) initLastUpdateTimestampFromTransport() *time.Time {
Expand All @@ -81,28 +76,70 @@ func (syncer *genericDBToTransportSyncer) initLastUpdateTimestampFromTransport()
}

func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) {
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved
lastUpdateTimestamp, err := syncer.db.GetLastUpdateTimestamp(ctx, syncer.dbTableName)
if err != nil {
syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName)
return
}
ticker := time.NewTicker(syncer.intervalPolicy.GetInterval())

if !lastUpdateTimestamp.After(*syncer.lastUpdateTimestamp) { // sync only if something has changed
return
for {
select {
case <-ctx.Done(): // we have received a signal to stop
ticker.Stop()
return

case <-ticker.C:
lastUpdateTimestamp, err := syncer.db.GetLastUpdateTimestamp(ctx, syncer.dbTableName)
if err != nil {
syncer.completeSync(ticker, false)
syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName)

continue
}

if !lastUpdateTimestamp.After(*syncer.lastUpdateTimestamp) { // sync only if something has changed
syncer.completeSync(ticker, false)

continue
}

// if we got here, then the last update timestamp from db is after what we have in memory.
// this means something has changed in db, syncing to transport.
bundleResult := syncer.createBundleFunc()
lastUpdateTimestamp, err = syncer.db.GetBundle(ctx, syncer.dbTableName, syncer.createObjFunc, bundleResult)

if err != nil {
syncer.completeSync(ticker, false)
syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName)

continue
}

syncer.lastUpdateTimestamp = lastUpdateTimestamp

syncer.syncToTransport(syncer.transportBundleKey, datatypes.SpecBundle, lastUpdateTimestamp, bundleResult)
syncer.completeSync(ticker, true)
}
vMaroon marked this conversation as resolved.
Show resolved Hide resolved
}
// if we got here, then the last update timestamp from db is after what we have in memory.
// this means something has changed in db, syncing to transport.
bundleResult := syncer.createBundleFunc()
lastUpdateTimestamp, err = syncer.db.GetBundle(ctx, syncer.dbTableName, syncer.createObjFunc, bundleResult)
}

if err != nil {
syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName)
return
// completeSync notifies policy whether sync was actually performed or skipped and resets ticker's interval
// to a new recalculated one.
func (syncer *genericDBToTransportSyncer) completeSync(ticker *time.Ticker, syncPerformed bool) {
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved
// get current sync interval
currentInterval := syncer.intervalPolicy.GetInterval()
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved

// notify policy whether sync was actually performed or skipped
vMaroon marked this conversation as resolved.
Show resolved Hide resolved
if syncPerformed {
syncer.intervalPolicy.OnSyncPerformed()
} else {
syncer.intervalPolicy.OnSyncSkipped()
}

syncer.lastUpdateTimestamp = lastUpdateTimestamp
// get recalculated sync interval
recalculatedInterval := syncer.intervalPolicy.GetInterval()
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved

syncer.syncToTransport(syncer.transportBundleKey, datatypes.SpecBundle, lastUpdateTimestamp, bundleResult)
ticker.Reset(recalculatedInterval)
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved

if currentInterval != recalculatedInterval {
syncer.log.Info(fmt.Sprintf("sync interval has been reset to %s", recalculatedInterval.String()))
}
}

func (syncer *genericDBToTransportSyncer) syncToTransport(id string, objType string, timestamp *time.Time,
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/dbsyncer/hoh_config_db_to_transport_syncer.go
Expand Up @@ -6,6 +6,7 @@ import (

configv1 "github.com/open-cluster-management/hub-of-hubs-data-types/apis/config/v1"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/controller/dbsyncer/intervalpolicy"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -26,9 +27,9 @@ func AddHoHConfigDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, tr
dbTableName: configTableName,
transport: transport,
transportBundleKey: configsMsgKey,
syncInterval: syncInterval,
createObjFunc: func() metav1.Object { return &configv1.Config{} },
createBundleFunc: bundle.NewBaseBundle,
intervalPolicy: intervalpolicy.NewDefaultSyncerIntervalPolicy(syncInterval),
}); err != nil {
return fmt.Errorf("failed to add db to transport syncer - %w", err)
}
Expand Down
@@ -0,0 +1,53 @@
package intervalpolicy

import (
"time"
)

const (
maxNumOfConsecutiveSyncs = 3
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved
maxNumOfIncrements = 3
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved
floatingFactor = 2
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved
)

// defaultSyncerIntervalPolicy is a default syncer "floating" interval policy.
type defaultSyncerIntervalPolicy struct {
originalInterval time.Duration
floatingInterval time.Duration
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved
numOfConsecutiveSyncs int64
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved
numOfIncrements int64
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved
}

// NewDefaultSyncerIntervalPolicy creates new default syncer interval policy.
func NewDefaultSyncerIntervalPolicy(interval time.Duration) SyncerIntervalPolicy {
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved
return &defaultSyncerIntervalPolicy{originalInterval: interval, floatingInterval: interval}
}

// OnSyncPerformed recalculates floating interval if sync happened.
func (policy *defaultSyncerIntervalPolicy) OnSyncPerformed() {
// we've reached maximum number of interval's increments due to consecutive syncs - there is nothing to do
if policy.numOfIncrements == maxNumOfIncrements {
return
}

policy.numOfConsecutiveSyncs++

// we've reached maximum number of consecutive syncs - increment the interval
if policy.numOfConsecutiveSyncs == maxNumOfConsecutiveSyncs {
policy.floatingInterval *= floatingFactor
policy.numOfConsecutiveSyncs = 0
policy.numOfIncrements++
}
}

// OnSyncSkipped resets the entire state of the policy if sync was skipped.
func (policy *defaultSyncerIntervalPolicy) OnSyncSkipped() {
policy.numOfConsecutiveSyncs = 0
policy.numOfIncrements = 0
policy.floatingInterval = policy.originalInterval
}

// GetInterval returns recalculated interval bases on the received events.
func (policy *defaultSyncerIntervalPolicy) GetInterval() time.Duration {
return policy.floatingInterval
}
10 changes: 10 additions & 0 deletions pkg/controller/dbsyncer/intervalpolicy/syncer_interval_policy.go
@@ -0,0 +1,10 @@
package intervalpolicy

import "time"

// SyncerIntervalPolicy defines a policy to return syncer interval based on the received events.
type SyncerIntervalPolicy interface {
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved
OnSyncPerformed()
nirrozenbaum marked this conversation as resolved.
Show resolved Hide resolved
OnSyncSkipped()
GetInterval() time.Duration
}
Expand Up @@ -6,6 +6,7 @@ import (

policiesv1 "github.com/open-cluster-management/governance-policy-propagator/pkg/apis/policy/v1"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/controller/dbsyncer/intervalpolicy"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -26,9 +27,9 @@ func AddPlacementBindingsDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSp
dbTableName: placementBindingsTableName,
transport: transport,
transportBundleKey: placementBindingsMsgKey,
syncInterval: syncInterval,
createObjFunc: func() metav1.Object { return &policiesv1.PlacementBinding{} },
createBundleFunc: bundle.NewPlacementBindingBundle,
intervalPolicy: intervalpolicy.NewDefaultSyncerIntervalPolicy(syncInterval),
}); err != nil {
return fmt.Errorf("failed to add db to transport syncer - %w", err)
}
Expand Down
Expand Up @@ -6,6 +6,7 @@ import (

appsv1 "github.com/open-cluster-management/governance-policy-propagator/pkg/apis/apps/v1"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/controller/dbsyncer/intervalpolicy"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -26,9 +27,9 @@ func AddPlacementRulesDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecD
dbTableName: placementRulesTableName,
transport: transport,
transportBundleKey: placementRulesMsgKey,
syncInterval: syncInterval,
createObjFunc: func() metav1.Object { return &appsv1.PlacementRule{} },
createBundleFunc: bundle.NewBaseBundle,
intervalPolicy: intervalpolicy.NewDefaultSyncerIntervalPolicy(syncInterval),
}); err != nil {
return fmt.Errorf("failed to add db to transport syncer - %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/dbsyncer/policies_db_to_transport_syncer.go
Expand Up @@ -6,6 +6,7 @@ import (

policiesv1 "github.com/open-cluster-management/governance-policy-propagator/pkg/apis/policy/v1"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/controller/dbsyncer/intervalpolicy"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -26,9 +27,9 @@ func AddPoliciesDBToTransportSyncer(mgr ctrl.Manager, db db.HubOfHubsSpecDB, tra
dbTableName: policiesTableName,
transport: transport,
transportBundleKey: policiesMsgKey,
syncInterval: syncInterval,
createObjFunc: func() metav1.Object { return &policiesv1.Policy{} },
createBundleFunc: bundle.NewBaseBundle,
intervalPolicy: intervalpolicy.NewDefaultSyncerIntervalPolicy(syncInterval),
}); err != nil {
return fmt.Errorf("failed to add db to transport syncer - %w", err)
}
Expand Down