Skip to content

Commit

Permalink
FMv2 double submission avoidance improvements (#4950)
Browse files Browse the repository at this point in the history
  • Loading branch information
PiotrTrzpil committed Sep 9, 2021
1 parent 005d2af commit 0d0e60f
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 76 deletions.
48 changes: 26 additions & 22 deletions core/services/fluxmonitorv2/flux_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,19 +636,27 @@ func (fm *FluxMonitor) respondToNewRoundLog(log flux_aggregator_wrapper.FluxAggr
return
}

if logRoundID < mostRecentRoundID {
newRoundLogger.Debugf("Received an older round log - a possible reorg, hence deleting round ids from %v to %v", logRoundID, mostRecentRoundID)
roundStats, jobRunStatus, err := fm.statsAndStatusForRound(logRoundID, 1)
if err != nil {
newRoundLogger.Errorf("error determining round stats / run status for round: %v", err)
return
}

if logRoundID < mostRecentRoundID && roundStats.NumNewRoundLogs > 0 {
newRoundLogger.Debugf("Received an older round log (and number of previously received NewRound logs is: %v) - "+
"a possible reorg, hence deleting round ids from %v to %v", roundStats.NumNewRoundLogs, logRoundID, mostRecentRoundID)
err = fm.orm.DeleteFluxMonitorRoundsBackThrough(fm.contractAddress, logRoundID)
if err != nil {
newRoundLogger.Errorf("error deleting reorged Flux Monitor rounds from DB: %v", err)
return
}
}

roundStats, jobRunStatus, err := fm.statsAndStatusForRound(logRoundID)
if err != nil {
newRoundLogger.Errorf("error determining round stats / run status for round: %v", err)
return
// as all newer stats were deleted, at this point a new round stats entry will be created
roundStats, err = fm.orm.FindOrCreateFluxMonitorRoundStats(fm.contractAddress, logRoundID, 1)
if err != nil {
newRoundLogger.Errorf("error determining subsequent round stats for round: %v", err)
return
}
}

if roundStats.NumSubmissions > 0 {
Expand Down Expand Up @@ -742,7 +750,7 @@ func (fm *FluxMonitor) respondToNewRoundLog(log flux_aggregator_wrapper.FluxAggr
if err2 != nil {
return err2
}
err2 = fm.queueTransactionForBPTXM(tx, runID, answer, roundState.RoundId)
err2 = fm.queueTransactionForBPTXM(tx, runID, answer, roundState.RoundId, &log)
if err2 != nil {
return err2
}
Expand Down Expand Up @@ -868,7 +876,7 @@ func (fm *FluxMonitor) pollIfEligible(pollReq PollRequestType, deviationChecker
}
}()

roundStats, jobRunStatus, err := fm.statsAndStatusForRound(roundState.RoundId)
roundStats, jobRunStatus, err := fm.statsAndStatusForRound(roundState.RoundId, 0)
if err != nil {
l.Errorw("error determining round stats / run status for round", "err", err)

Expand Down Expand Up @@ -972,7 +980,7 @@ func (fm *FluxMonitor) pollIfEligible(pollReq PollRequestType, deviationChecker
if err2 != nil {
return err2
}
err2 = fm.queueTransactionForBPTXM(tx, runID, answer, roundState.RoundId)
err2 = fm.queueTransactionForBPTXM(tx, runID, answer, roundState.RoundId, nil)
if err2 != nil {
return err2
}
Expand Down Expand Up @@ -1049,12 +1057,7 @@ func (fm *FluxMonitor) initialRoundState() flux_aggregator_wrapper.OracleRoundSt
return latestRoundState
}

func (fm *FluxMonitor) queueTransactionForBPTXM(
db *gorm.DB,
runID int64,
answer decimal.Decimal,
roundID uint32,
) error {
func (fm *FluxMonitor) queueTransactionForBPTXM(db *gorm.DB, runID int64, answer decimal.Decimal, roundID uint32, log *flux_aggregator_wrapper.FluxAggregatorNewRound) error {
// Submit the Eth Tx
err := fm.contractSubmitter.Submit(
db,
Expand All @@ -1065,12 +1068,17 @@ func (fm *FluxMonitor) queueTransactionForBPTXM(
return err
}

numLogs := uint(0)
if log != nil {
numLogs = 1
}
// Update the flux monitor round stats
err = fm.orm.UpdateFluxMonitorRoundStats(
db,
fm.contractAddress,
roundID,
runID,
numLogs,
)
if err != nil {
fm.logger.Errorw(
Expand All @@ -1084,12 +1092,8 @@ func (fm *FluxMonitor) queueTransactionForBPTXM(
return nil
}

func (fm *FluxMonitor) statsAndStatusForRound(roundID uint32) (
FluxMonitorRoundStatsV2,
pipeline.RunStatus,
error,
) {
roundStats, err := fm.orm.FindOrCreateFluxMonitorRoundStats(fm.contractAddress, roundID)
func (fm *FluxMonitor) statsAndStatusForRound(roundID uint32, newRoundLogs uint) (FluxMonitorRoundStatsV2, pipeline.RunStatus, error) {
roundStats, err := fm.orm.FindOrCreateFluxMonitorRoundStats(fm.contractAddress, roundID, newRoundLogs)
if err != nil {
return FluxMonitorRoundStatsV2{}, pipeline.RunStatusUnknown, err
}
Expand Down

0 comments on commit 0d0e60f

Please sign in to comment.