Skip to content

Commit

Permalink
Merge pull request #4470 from harmony-one/dev
Browse files Browse the repository at this point in the history
Release Candidate 2023.2.5 (dev -> main)
  • Loading branch information
ONECasey committed Jul 21, 2023
2 parents c7a63ba + 0e4b629 commit 65e5cc2
Show file tree
Hide file tree
Showing 41 changed files with 1,382 additions and 276 deletions.
2 changes: 2 additions & 0 deletions api/service/stagedstreamsync/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type syncProtocol interface {
GetRawBlocksByNumber(ctx context.Context, bns []uint64, opts ...syncproto.Option) ([][]byte, [][]byte, sttypes.StreamID, error)
GetBlockHashes(ctx context.Context, bns []uint64, opts ...syncproto.Option) ([]common.Hash, sttypes.StreamID, error)
GetBlocksByHashes(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) ([]*types.Block, sttypes.StreamID, error)
GetReceipts(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) (receipts []types.Receipts, stid sttypes.StreamID, err error)
GetNodeData(ctx context.Context, hs []common.Hash, opts ...syncproto.Option) (data [][]byte, stid sttypes.StreamID, err error)

RemoveStream(stID sttypes.StreamID) // If a stream delivers invalid data, remove the stream
StreamFailed(stID sttypes.StreamID, reason string)
Expand Down
2 changes: 1 addition & 1 deletion consensus/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (consensus *Consensus) isRightBlockNumAndViewID(recvMsg *FBFTMessage) bool
}

func (consensus *Consensus) onAnnounceSanityChecks(recvMsg *FBFTMessage) bool {
logMsgs := consensus.FBFTLog.GetMessagesByTypeSeqView(
logMsgs := consensus.fBFTLog.GetMessagesByTypeSeqView(
msg_pb.MessageType_ANNOUNCE, recvMsg.BlockNum, recvMsg.ViewID,
)
if len(logMsgs) > 0 {
Expand Down
43 changes: 25 additions & 18 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type VerifyBlockFunc func(*types.Block) error
type Consensus struct {
Decider quorum.Decider
// FBFTLog stores the pbft messages and blocks during FBFT process
FBFTLog *FBFTLog
fBFTLog *FBFTLog
// phase: different phase of FBFT protocol: pre-prepare, prepare, commit, finish etc
phase FBFTPhase
// current indicates what state a node is in
Expand Down Expand Up @@ -84,7 +84,7 @@ type Consensus struct {
// IgnoreViewIDCheck determines whether to ignore viewID check
IgnoreViewIDCheck *abool.AtomicBool
// consensus mutex
mutex sync.RWMutex
mutex *sync.RWMutex
// ViewChange struct
vc *viewChange
// Signal channel for proposing a new block and start new consensus
Expand Down Expand Up @@ -140,6 +140,13 @@ func (consensus *Consensus) Blockchain() core.BlockChain {
return consensus.registry.GetBlockchain()
}

func (consensus *Consensus) FBFTLog() FBFT {
return threadsafeFBFTLog{
log: consensus.fBFTLog,
mu: consensus.mutex,
}
}

// ChainReader returns the chain reader.
// This is mostly the same as Blockchain, but it returns only read methods, so we assume it's safe for concurrent use.
func (consensus *Consensus) ChainReader() engine.ChainReader {
Expand All @@ -165,11 +172,11 @@ func (consensus *Consensus) Beaconchain() core.BlockChain {

// VerifyBlock is a function used to verify the block and keep trace of verified blocks.
func (consensus *Consensus) verifyBlock(block *types.Block) error {
if !consensus.FBFTLog.IsBlockVerified(block.Hash()) {
if !consensus.fBFTLog.IsBlockVerified(block.Hash()) {
if err := consensus.BlockVerifier(block); err != nil {
return errors.Errorf("Block verification failed: %s", err)
}
consensus.FBFTLog.MarkBlockVerified(block)
consensus.fBFTLog.MarkBlockVerified(block)
}
return nil
}
Expand Down Expand Up @@ -261,21 +268,21 @@ func New(
Decider quorum.Decider, minPeers int, aggregateSig bool,
) (*Consensus, error) {
consensus := Consensus{
ShardID: shard,
mutex: &sync.RWMutex{},
ShardID: shard,
fBFTLog: NewFBFTLog(),
phase: FBFTAnnounce,
current: State{mode: Normal},
Decider: Decider,
registry: registry,
MinPeers: minPeers,
AggregateSig: aggregateSig,
host: host,
msgSender: NewMessageSender(host),
BlockNumLowChan: make(chan struct{}, 1),
// FBFT timeout
consensusTimeout: createTimeout(),
}
consensus.Decider = Decider
consensus.registry = registry
consensus.MinPeers = minPeers
consensus.AggregateSig = aggregateSig
consensus.host = host
consensus.msgSender = NewMessageSender(host)
consensus.BlockNumLowChan = make(chan struct{}, 1)
// FBFT related
consensus.FBFTLog = NewFBFTLog()
consensus.phase = FBFTAnnounce
consensus.current = State{mode: Normal}
// FBFT timeout
consensus.consensusTimeout = createTimeout()

if multiBLSPriKey != nil {
consensus.priKey = multiBLSPriKey
Expand Down
15 changes: 6 additions & 9 deletions consensus/consensus_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ func (consensus *Consensus) updateBitmaps() {
Str("MessageType", consensus.phase.String()).
Msg("[UpdateBitmaps] Updating consensus bitmaps")
members := consensus.Decider.Participants()
prepareBitmap, _ := bls_cosi.NewMask(members, nil)
commitBitmap, _ := bls_cosi.NewMask(members, nil)
multiSigBitmap, _ := bls_cosi.NewMask(members, nil)
prepareBitmap := bls_cosi.NewMask(members)
commitBitmap := bls_cosi.NewMask(members)
multiSigBitmap := bls_cosi.NewMask(members)
consensus.prepareBitmap = prepareBitmap
consensus.commitBitmap = commitBitmap
consensus.multiSigBitmap = multiSigBitmap
Expand Down Expand Up @@ -575,7 +575,7 @@ func (consensus *Consensus) selfCommit(payload []byte) error {
copy(blockHash[:], payload[:32])

// Leader sign and add commit message
block := consensus.FBFTLog.GetBlockByHash(blockHash)
block := consensus.fBFTLog.GetBlockByHash(blockHash)
if block == nil {
return errGetPreparedBlock
}
Expand Down Expand Up @@ -627,11 +627,8 @@ func (consensus *Consensus) NumSignaturesIncludedInBlock(block *types.Block) uin
count := uint32(0)
members := consensus.Decider.Participants()
// TODO(audit): do not reconstruct the Mask
mask, err := bls.NewMask(members, nil)
if err != nil {
return count
}
err = mask.SetMask(block.Header().LastCommitBitmap())
mask := bls.NewMask(members)
err := mask.SetMask(block.Header().LastCommitBitmap())
if err != nil {
return count
}
Expand Down
3 changes: 1 addition & 2 deletions consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func TestConsensusInitialization(t *testing.T) {
assert.NoError(t, err)

messageSender := &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec}
fbtLog := NewFBFTLog()
state := State{mode: Normal}

timeouts := createTimeout()
Expand All @@ -37,7 +36,7 @@ func TestConsensusInitialization(t *testing.T) {
assert.IsType(t, make(chan struct{}), consensus.BlockNumLowChan)

// FBFTLog
assert.Equal(t, fbtLog, consensus.FBFTLog)
assert.NotNil(t, consensus.FBFTLog())

assert.Equal(t, FBFTAnnounce, consensus.phase)

Expand Down
33 changes: 13 additions & 20 deletions consensus/consensus_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ func (consensus *Consensus) finalCommit() {
network.Bytes,
network.FBFTMsg
commitSigAndBitmap := FBFTMsg.Payload
consensus.FBFTLog.AddVerifiedMessage(FBFTMsg)
consensus.fBFTLog.AddVerifiedMessage(FBFTMsg)
// find correct block content
curBlockHash := consensus.blockHash
block := consensus.FBFTLog.GetBlockByHash(curBlockHash)
block := consensus.fBFTLog.GetBlockByHash(curBlockHash)
if block == nil {
consensus.getLogger().Warn().
Str("blockHash", hex.EncodeToString(curBlockHash[:])).
Expand Down Expand Up @@ -278,7 +278,7 @@ func (consensus *Consensus) BlockCommitSigs(blockNum uint64) ([]byte, error) {
lastCommits, err := consensus.Blockchain().ReadCommitSig(blockNum)
if err != nil ||
len(lastCommits) < bls.BLSSignatureSizeInBytes {
msgs := consensus.FBFTLog.GetMessagesByTypeSeq(
msgs := consensus.FBFTLog().GetMessagesByTypeSeq(
msg_pb.MessageType_COMMITTED, blockNum,
)
if len(msgs) != 1 {
Expand Down Expand Up @@ -482,7 +482,7 @@ func (consensus *Consensus) getLastMileBlockIter(bnStart uint64, cb func(iter *L
}
return cb(&LastMileBlockIter{
blockCandidates: blocks,
fbftLog: consensus.FBFTLog,
fbftLog: consensus.fBFTLog,
verify: consensus.BlockVerifier,
curIndex: 0,
logger: consensus.getLogger(),
Expand Down Expand Up @@ -513,7 +513,7 @@ func (consensus *Consensus) getLastMileBlocksAndMsg(bnStart uint64) ([]*types.Bl
msgs []*FBFTMessage
)
for blockNum := bnStart; ; blockNum++ {
blk, msg, err := consensus.FBFTLog.GetCommittedBlockAndMsgsFromNumber(blockNum, consensus.getLogger())
blk, msg, err := consensus.fBFTLog.GetCommittedBlockAndMsgsFromNumber(blockNum, consensus.getLogger())
if err != nil {
if err == errFBFTLogNotFound {
break
Expand Down Expand Up @@ -551,7 +551,7 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
network.Bytes,
network.FBFTMsg
bareMinimumCommit := FBFTMsg.Payload
consensus.FBFTLog.AddVerifiedMessage(FBFTMsg)
consensus.fBFTLog.AddVerifiedMessage(FBFTMsg)

if err := consensus.verifyLastCommitSig(bareMinimumCommit, blk); err != nil {
return errors.Wrap(err, "[preCommitAndPropose] failed verifying last commit sig")
Expand All @@ -567,16 +567,16 @@ func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error {
nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID)),
},
p2p.ConstructMessage(msgToSend)); err != nil {
consensus.getLogger().Warn().Err(err).Msg("[preCommitAndPropose] Cannot send committed message")
consensus.GetLogger().Warn().Err(err).Msg("[preCommitAndPropose] Cannot send committed message")
} else {
consensus.getLogger().Info().
consensus.GetLogger().Info().
Str("blockHash", blk.Hash().Hex()).
Uint64("blockNum", consensus.BlockNum()).
Hex("lastCommitSig", bareMinimumCommit).
Msg("[preCommitAndPropose] Sent Committed Message")
}

if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk.Hash())); err != nil {
if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.FBFTLog().IsBlockVerified(blk.Hash())); err != nil {
consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] Failed to add block to chain")
return
}
Expand Down Expand Up @@ -661,7 +661,7 @@ func (consensus *Consensus) tryCatchup() error {

func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMessage) error {
if consensus.Blockchain().CurrentBlock().NumberU64() < blk.NumberU64() {
if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk.Hash())); err != nil {
if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.fBFTLog.IsBlockVerified(blk.Hash())); err != nil {
consensus.getLogger().Error().Err(err).Msg("[commitBlock] Failed to add block to chain")
return err
}
Expand Down Expand Up @@ -785,7 +785,7 @@ func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg
if blk.IsLastBlockInEpoch() {
consensus.setMode(consensus.updateConsensusInformation())
}
consensus.FBFTLog.PruneCacheBeforeBlock(blk.NumberU64())
consensus.fBFTLog.PruneCacheBeforeBlock(blk.NumberU64())
consensus.resetState()
}

Expand Down Expand Up @@ -920,19 +920,12 @@ func (consensus *Consensus) ValidateVdfAndProof(headerObj *block.Header) bool {
func (consensus *Consensus) DeleteBlocksLessThan(number uint64) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.FBFTLog.deleteBlocksLessThan(number)
consensus.fBFTLog.deleteBlocksLessThan(number)
}

// DeleteMessagesLessThan deletes messages less than given block number.
func (consensus *Consensus) DeleteMessagesLessThan(number uint64) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.FBFTLog.deleteMessagesLessThan(number)
}

// DeleteBlockByNumber deletes block by given block number.
func (consensus *Consensus) DeleteBlockByNumber(number uint64) {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
consensus.FBFTLog.deleteBlockByNumber(number)
consensus.fBFTLog.deleteMessagesLessThan(number)
}
6 changes: 1 addition & 5 deletions consensus/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,7 @@ func (consensus *Consensus) construct(
)
} else {
// TODO: use a persistent bitmap to report bitmap
mask, err := bls.NewMask(consensus.Decider.Participants(), nil)
if err != nil {
utils.Logger().Warn().Err(err).Msg("unable to setup mask for multi-sig message")
return nil, err
}
mask := bls.NewMask(consensus.Decider.Participants())
for _, key := range priKeys {
mask.SetKey(key.Pub.Bytes, true)
}
Expand Down
6 changes: 3 additions & 3 deletions consensus/double_sign.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (consensus *Consensus) checkDoubleSign(recvMsg *FBFTMessage) bool {
); alreadyCastBallot != nil {
for _, pubKey1 := range alreadyCastBallot.SignerPubKeys {
if bytes.Compare(pubKey2.Bytes[:], pubKey1[:]) == 0 {
for _, blk := range consensus.FBFTLog.GetBlocksByNumber(recvMsg.BlockNum) {
for _, blk := range consensus.fBFTLog.GetBlocksByNumber(recvMsg.BlockNum) {
firstSignedHeader := blk.Header()
areHeightsEqual := firstSignedHeader.Number().Uint64() == recvMsg.BlockNum
areViewIDsEqual := firstSignedHeader.ViewID().Uint64() == recvMsg.ViewID
Expand Down Expand Up @@ -138,8 +138,8 @@ func (consensus *Consensus) couldThisBeADoubleSigner(
recvMsg *FBFTMessage,
) bool {
num, hash := consensus.BlockNum(), recvMsg.BlockHash
suspicious := !consensus.FBFTLog.HasMatchingAnnounce(num, hash) ||
!consensus.FBFTLog.HasMatchingPrepared(num, hash)
suspicious := !consensus.fBFTLog.HasMatchingAnnounce(num, hash) ||
!consensus.fBFTLog.HasMatchingPrepared(num, hash)
if suspicious {
consensus.getLogger().Debug().
Str("message", recvMsg.String()).
Expand Down
21 changes: 21 additions & 0 deletions consensus/engine/consensus_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,27 @@ type ChainReader interface {
// Config retrieves the blockchain's chain configuration.
Config() *params.ChainConfig

// TrieNode retrieves a blob of data associated with a trie node
// either from ephemeral in-memory cache, or from persistent storage.
TrieNode(hash common.Hash) ([]byte, error)

// ContractCode retrieves a blob of data associated with a contract
// hash either from ephemeral in-memory cache, or from persistent storage.
//
// If the code doesn't exist in the in-memory cache, check the storage with
// new code scheme.
ContractCode(hash common.Hash) ([]byte, error)

// ValidatorCode retrieves a blob of data associated with a validator
// hash either from ephemeral in-memory cache, or from persistent storage.
//
// If the code doesn't exist in the in-memory cache, check the storage with
// new code scheme.
ValidatorCode(hash common.Hash) ([]byte, error)

// GetReceiptsByHash retrieves the receipts for all transactions in a given block.
GetReceiptsByHash(hash common.Hash) types.Receipts

// CurrentHeader retrieves the current header from the local chain.
CurrentHeader() *block.Header

Expand Down

0 comments on commit 65e5cc2

Please sign in to comment.