-
Notifications
You must be signed in to change notification settings - Fork 339
/
agent.go
409 lines (334 loc) · 9.89 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
// Copyright 2022 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package storageincentives
import (
"context"
"crypto/rand"
"errors"
"fmt"
"io"
"math"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethersphere/bee/pkg/crypto"
"github.com/ethersphere/bee/pkg/log"
"github.com/ethersphere/bee/pkg/postage"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/storageincentives/redistribution"
"github.com/ethersphere/bee/pkg/swarm"
)
const loggerName = "storageincentives"
const (
DefaultBlocksPerRound = 152
DefaultBlocksPerPhase = DefaultBlocksPerRound / 4
)
type ChainBackend interface {
BlockNumber(context.Context) (uint64, error)
HeaderByNumber(context.Context, *big.Int) (*types.Header, error)
}
type Monitor interface {
IsFullySynced() bool
}
type Agent struct {
logger log.Logger
metrics metrics
backend ChainBackend
blocksPerRound uint64
monitor Monitor
contract redistribution.Contract
reserve postage.Storer
sampler storage.Sampler
overlay swarm.Address
quit chan struct{}
wg sync.WaitGroup
}
func New(
overlay swarm.Address,
backend ChainBackend,
logger log.Logger,
monitor Monitor,
contract redistribution.Contract,
reserve postage.Storer,
sampler storage.Sampler,
blockTime time.Duration, blocksPerRound, blocksPerPhase uint64) *Agent {
s := &Agent{
overlay: overlay,
metrics: newMetrics(),
backend: backend,
logger: logger.WithName(loggerName).Register(),
contract: contract,
reserve: reserve,
monitor: monitor,
blocksPerRound: blocksPerRound,
sampler: sampler,
quit: make(chan struct{}),
}
s.wg.Add(1)
go s.start(blockTime, blocksPerRound, blocksPerPhase)
return s
}
// start polls the current block number, calculates, and publishes only once the current phase.
// Each round is blocksPerRound long and is divided in to three blocksPerPhase long phases: commit, reveal, claim.
// The sample phase is triggered upon entering the claim phase and may run until the end of the commit phase.
// If our neighborhood is selected to participate, a sample is created during the sample phase. In the commit phase,
// the sample is submitted, and in the reveal phase, the obfuscation key from the commit phase is submitted.
// Next, in the claim phase, we check if we've won, and the cycle repeats. The cycle must occur in the length of one round.
func (a *Agent) start(blockTime time.Duration, blocksPerRound, blocksPerPhase uint64) {
defer a.wg.Done()
var (
mtx sync.Mutex
sampleRound uint64 = math.MaxUint64
commitRound uint64 = math.MaxUint64
revealRound uint64 = math.MaxUint64
round uint64
reserveSample []byte
obfuscationKey []byte
storageRadius uint8
phaseEvents = newEvents()
)
// cancel all possible running phases
defer phaseEvents.Close()
commitF := func(ctx context.Context) {
phaseEvents.Cancel(claim)
mtx.Lock()
round := round
sampleRound := sampleRound
storageRadius := storageRadius
reserveSample := reserveSample
mtx.Unlock()
if round-1 == sampleRound { // the sample has to come from previous round to be able to commit it
obf, err := a.commit(ctx, storageRadius, reserveSample)
if err != nil {
a.logger.Error(err, "commit")
} else {
mtx.Lock()
obfuscationKey = obf
commitRound = round
mtx.Unlock()
a.logger.Debug("committed the reserve sample and radius")
}
}
}
// when the sample finishes, if we are in the commit phase, run commit
phaseEvents.On(sampleEnd, func(ctx context.Context, previous PhaseType) {
if previous == commit {
commitF(ctx)
}
})
// when we enter the commit phase, if the sample is already finished, run commit
phaseEvents.On(commit, func(ctx context.Context, previous PhaseType) {
if previous == sampleEnd {
commitF(ctx)
}
})
phaseEvents.On(reveal, func(ctx context.Context, _ PhaseType) {
// cancel previous executions of the commit and sample phases
phaseEvents.Cancel(commit, sample, sampleEnd)
mtx.Lock()
round := round
commitRound := commitRound
storageRadius := storageRadius
reserveSample := reserveSample
obfuscationKey := obfuscationKey
mtx.Unlock()
if round == commitRound { // reveal requires the obfuscationKey from the same round
err := a.reveal(ctx, storageRadius, reserveSample, obfuscationKey)
if err != nil {
a.logger.Error(err, "reveal")
} else {
mtx.Lock()
revealRound = round
mtx.Unlock()
a.logger.Debug("revealed the sample with the obfuscation key")
}
}
})
phaseEvents.On(claim, func(ctx context.Context, _ PhaseType) {
phaseEvents.Cancel(reveal)
mtx.Lock()
round := round
revealRound := revealRound
mtx.Unlock()
if round == revealRound { // to claim, previous reveal must've happened in the same round
err := a.claim(ctx)
if err != nil {
a.logger.Error(err, "claim")
}
}
})
phaseEvents.On(sample, func(ctx context.Context, _ PhaseType) {
mtx.Lock()
round := round
mtx.Unlock()
sr, smpl, err := a.play(ctx)
if err != nil {
a.logger.Error(err, "make sample")
} else if smpl != nil {
mtx.Lock()
sampleRound = round
reserveSample = smpl
storageRadius = sr
a.logger.Info("produced reserve sample", "round", round)
mtx.Unlock()
}
phaseEvents.Publish(sampleEnd)
})
var (
prevPhase PhaseType = -1
currentPhase PhaseType
checkEvery uint64 = 1
)
// optimization, we do not need to check the phase change at every new block
if blocksPerPhase > 10 {
checkEvery = 5
}
for {
select {
case <-a.quit:
return
case <-time.After(blockTime * time.Duration(checkEvery)):
}
a.metrics.BackendCalls.Inc()
block, err := a.backend.BlockNumber(context.Background())
if err != nil {
a.metrics.BackendErrors.Inc()
a.logger.Error(err, "getting block number")
continue
}
mtx.Lock()
round = block / blocksPerRound
a.metrics.Round.Set(float64(round))
// TODO: to be changed for the mainnet
// compute the current phase
p := block % blocksPerRound
if p < blocksPerPhase {
currentPhase = commit // [0, 37]
} else if p >= blocksPerPhase && p <= 2*blocksPerPhase { // [38, 76]
currentPhase = reveal
} else if p > 2*blocksPerPhase {
currentPhase = claim // (76, 152)
}
// write the current phase only once
if currentPhase != prevPhase {
a.metrics.CurrentPhase.Set(float64(currentPhase))
a.logger.Info("entering phase", "phase", currentPhase.String(), "round", round, "block", block)
phaseEvents.Publish(currentPhase)
if currentPhase == claim {
phaseEvents.Publish(sample) // trigger sample along side the claim phase
}
}
prevPhase = currentPhase
mtx.Unlock()
}
}
func (a *Agent) reveal(ctx context.Context, storageRadius uint8, sample, obfuscationKey []byte) error {
a.metrics.RevealPhase.Inc()
err := a.contract.Reveal(ctx, storageRadius, sample, obfuscationKey)
if err != nil {
a.metrics.ErrReveal.Inc()
}
return err
}
func (a *Agent) claim(ctx context.Context) error {
a.metrics.ClaimPhase.Inc()
// event claimPhase was processed
isWinner, err := a.contract.IsWinner(ctx)
if err != nil {
a.metrics.ErrWinner.Inc()
return err
}
if isWinner {
a.metrics.Winner.Inc()
err = a.contract.Claim(ctx)
if err != nil {
a.metrics.ErrClaim.Inc()
return fmt.Errorf("error claiming win: %w", err)
} else {
a.logger.Info("claimed win")
}
} else {
a.logger.Info("claim made, lost round")
}
return nil
}
func (a *Agent) play(ctx context.Context) (uint8, []byte, error) {
// get depthmonitor fully synced indicator
ready := a.monitor.IsFullySynced()
if !ready {
return 0, nil, nil
}
storageRadius := a.reserve.GetReserveState().StorageRadius
isPlaying, err := a.contract.IsPlaying(ctx, storageRadius)
if !isPlaying || err != nil {
a.metrics.ErrCheckIsPlaying.Inc()
return 0, nil, err
}
a.logger.Info("neighbourhood chosen")
a.metrics.NeighborhoodSelected.Inc()
salt, err := a.contract.ReserveSalt(ctx)
if err != nil {
return 0, nil, err
}
t := time.Now()
a.metrics.BackendCalls.Inc()
block, err := a.backend.BlockNumber(ctx)
if err != nil {
a.metrics.BackendErrors.Inc()
return 0, nil, err
}
previousRoundNumber := (block / a.blocksPerRound) - 1
a.metrics.BackendCalls.Inc()
timeLimiterBlock, err := a.backend.HeaderByNumber(ctx, new(big.Int).SetUint64(previousRoundNumber*a.blocksPerRound))
if err != nil {
a.metrics.BackendErrors.Inc()
return 0, nil, err
}
timeLimiter := time.Duration(timeLimiterBlock.Time) * time.Second / time.Nanosecond
sample, err := a.sampler.ReserveSample(ctx, salt, storageRadius, uint64(timeLimiter))
if err != nil {
return 0, nil, err
}
a.metrics.SampleDuration.Set(time.Since(t).Seconds())
return storageRadius, sample.Hash.Bytes(), nil
}
func (a *Agent) commit(ctx context.Context, storageRadius uint8, sample []byte) ([]byte, error) {
a.metrics.CommitPhase.Inc()
key := make([]byte, swarm.HashSize)
if _, err := io.ReadFull(rand.Reader, key); err != nil {
return nil, err
}
obfuscatedHash, err := a.wrapCommit(storageRadius, sample, key)
if err != nil {
return nil, err
}
err = a.contract.Commit(ctx, obfuscatedHash)
if err != nil {
a.metrics.ErrCommit.Inc()
return nil, err
}
return key, nil
}
func (a *Agent) Close() error {
close(a.quit)
stopped := make(chan struct{})
go func() {
a.wg.Wait()
close(stopped)
}()
select {
case <-stopped:
return nil
case <-time.After(5 * time.Second):
return errors.New("stopping incentives with ongoing worker goroutine")
}
}
func (s *Agent) wrapCommit(storageRadius uint8, sample []byte, key []byte) ([]byte, error) {
storageRadiusByte := []byte{storageRadius}
data := append(s.overlay.Bytes(), storageRadiusByte...)
data = append(data, sample...)
data = append(data, key...)
return crypto.LegacyKeccak256(data)
}