-
Notifications
You must be signed in to change notification settings - Fork 2
/
generic_db_to_transport_syncer.go
152 lines (120 loc) · 4.68 KB
/
generic_db_to_transport_syncer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package dbsyncer
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/go-logr/logr"
datatypes "github.com/open-cluster-management/hub-of-hubs-data-types"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/bundle"
hohDb "github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/db"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/intervalpolicy"
"github.com/open-cluster-management/hub-of-hubs-spec-transport-bridge/pkg/transport"
)
const (
timeFormat = "2006-01-02_15-04-05.000000"
)
type genericDBToTransportSyncer struct {
log logr.Logger
db hohDb.HubOfHubsSpecDB
dbTableName string
transport transport.Transport
transportBundleKey string
lastUpdateTimestamp *time.Time
createObjFunc bundle.CreateObjectFunction
createBundleFunc bundle.CreateBundleFunction
intervalPolicy intervalpolicy.IntervalPolicy
}
func (syncer *genericDBToTransportSyncer) Start(stopChannel <-chan struct{}) error {
ctx, cancelContext := context.WithCancel(context.Background())
defer cancelContext()
syncer.init(ctx)
go syncer.periodicSync(ctx)
<-stopChannel // blocking wait for stop event
cancelContext()
syncer.log.Info("stopped syncer", "table", syncer.dbTableName)
return nil
}
func (syncer *genericDBToTransportSyncer) init(ctx context.Context) {
// on initialization, we initialize the lastUpdateTimestamp from the transport layer, as this is the last timestamp
// that transport bridge sent an update.
// later, in SyncBundle, it will check the db if there are newer updates and if yes it will send it with
// transport layer and update the lastUpdateTimestamp field accordingly.
timestamp := syncer.initLastUpdateTimestampFromTransport()
if timestamp != nil {
syncer.lastUpdateTimestamp = timestamp
} else {
syncer.lastUpdateTimestamp = &time.Time{}
}
syncer.log.Info("initialized syncer", "table", fmt.Sprintf("spec.%s", syncer.dbTableName))
syncer.syncBundle(ctx)
}
func (syncer *genericDBToTransportSyncer) initLastUpdateTimestampFromTransport() *time.Time {
version := syncer.transport.GetVersion(syncer.transportBundleKey, datatypes.SpecBundle)
if version == "" {
return nil
}
timestamp, err := time.Parse(timeFormat, version)
if err != nil {
return nil
}
return ×tamp
}
// syncBundle performs the actual sync logic and returns true if bundle was committed to transport, otherwise false.
func (syncer *genericDBToTransportSyncer) syncBundle(ctx context.Context) bool {
lastUpdateTimestamp, err := syncer.db.GetLastUpdateTimestamp(ctx, syncer.dbTableName)
if err != nil {
syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName)
return false
}
if !lastUpdateTimestamp.After(*syncer.lastUpdateTimestamp) { // sync only if something has changed
return false
}
// if we got here, then the last update timestamp from db is after what we have in memory.
// this means something has changed in db, syncing to transport.
bundleResult := syncer.createBundleFunc()
lastUpdateTimestamp, err = syncer.db.GetBundle(ctx, syncer.dbTableName, syncer.createObjFunc, bundleResult)
if err != nil {
syncer.log.Error(err, "unable to sync bundle to leaf hubs", syncer.dbTableName)
return false
}
syncer.lastUpdateTimestamp = lastUpdateTimestamp
syncer.syncToTransport(syncer.transportBundleKey, datatypes.SpecBundle, lastUpdateTimestamp, bundleResult)
return true
}
func (syncer *genericDBToTransportSyncer) periodicSync(ctx context.Context) {
ticker := time.NewTicker(syncer.intervalPolicy.GetInterval())
for {
select {
case <-ctx.Done(): // we have received a signal to stop
ticker.Stop()
return
case <-ticker.C:
synced := syncer.syncBundle(ctx)
// get current sync interval
currentInterval := syncer.intervalPolicy.GetInterval()
// notify policy whether sync was actually performed or skipped
if synced {
syncer.intervalPolicy.Evaluate()
} else {
syncer.intervalPolicy.Reset()
}
// get reevaluated sync interval
reevaluatedInterval := syncer.intervalPolicy.GetInterval()
// reset ticker if needed
if currentInterval != reevaluatedInterval {
ticker.Reset(reevaluatedInterval)
syncer.log.Info(fmt.Sprintf("sync interval has been reset to %s", reevaluatedInterval.String()))
}
}
}
}
func (syncer *genericDBToTransportSyncer) syncToTransport(id string, objType string, timestamp *time.Time,
payload bundle.Bundle) {
payloadBytes, err := json.Marshal(payload)
if err != nil {
syncer.log.Error(err, "failed to sync object", fmt.Sprintf("object type %s with id %s", objType, id))
return
}
syncer.transport.SendAsync(id, objType, timestamp.Format(timeFormat), payloadBytes)
}