Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/feat/storage-pop-epochs-to-proce…
Browse files Browse the repository at this point in the history
…ss' into feat/storage-pop-epochs-to-process
  • Loading branch information
vgorkavenko committed May 8, 2024
2 parents 79ead6a + 45e4a41 commit 8388a26
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 24 deletions.
39 changes: 21 additions & 18 deletions src/modules/csm/checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import cast

from timeout_decorator import TimeoutError as DecoratorTimeoutError

from src.modules.csm.state import State
from src.providers.consensus.client import ConsensusClient
from src.typings import EpochNumber, BlockRoot, SlotNumber, BlockStamp, ValidatorIndex
Expand Down Expand Up @@ -108,25 +110,26 @@ def _unprocessed():
yield _epoch

with ThreadPoolExecutor() as ext:
futures = {
ext.submit(self._process_epoch, last_finalized_blockstamp, duty_epoch): duty_epoch
for duty_epoch in _unprocessed()
}
for future in as_completed(futures):
duty_epoch = futures[future]
try:
try:
futures = {
ext.submit(self._process_epoch, last_finalized_blockstamp, duty_epoch)
for duty_epoch in _unprocessed()
}
for future in as_completed(futures):
future.result()
except Exception as e:
logger.error({
"msg": f"Error processing epoch {duty_epoch} in thread, " +
"wait the current threads and shutdown the executor",
"error": str(e)
})
# Wait only for the current running threads to prevent
# a lot of similar error-possible requests to the consensus node.
# Raise the error after a batch of running threads is finished
ext.shutdown(wait=True, cancel_futures=True)
raise ValueError(e) from e
except DecoratorTimeoutError as e:
logger.error({"msg": "Timeout processing epochs in threads", "error": str(e)})
# Don't wait the current running tasks to finish, cancel the rest and shutdown the executor
# To interrupt the current running tasks, we need to raise a special exception
ext.shutdown(wait=False, cancel_futures=True)
raise SystemExit(1) from e
except Exception as e:
logger.error({"msg": "Error processing epochs in threads, wait the current threads", "error": str(e)})
# Wait only for the current running threads to prevent
# a lot of similar error-possible requests to the consensus node.
# Raise the error after a batch of running threads is finished
ext.shutdown(wait=True, cancel_futures=True)
raise ValueError(e) from e

def _select_roots_to_check(
self, duty_epoch: EpochNumber
Expand Down
17 changes: 11 additions & 6 deletions src/modules/csm/csm.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,18 @@ def build_report(self, blockstamp: ReferenceBlockStamp) -> tuple:
no_id, amount = v["value"]
shares[no_id] += amount

# XXX: We put a stone here to make sure, that even with only 1 node operator in the tree, it's still possible to
# claim rewards. The CSModule contract skips pulling rewards if the proof's length is zero, which is the case
# when the tree has only one leaf.
if shares:
if distributed:
tree = Tree.new(tuple((no_id, amount) for (no_id, amount) in shares.items()))
logger.info({"msg": "New tree built for the report", "root": repr(tree.root)})
cid = self.w3.ipfs.upload(tree.encode())
self.w3.ipfs.pin(cid)
root = tree.root
shares[self.w3.csm.module.MAX_OPERATORS_COUNT] = 0 # type: ignore

if distributed:
tree = Tree.new(tuple((no_id, amount) for (no_id, amount) in shares.items()))
logger.info({"msg": "New tree built for the report", "root": repr(tree.root)})
cid = self.w3.ipfs.upload(tree.encode())
self.w3.ipfs.pin(cid)
root = tree.root

if root == ZERO_HASH:
logger.info({"msg": "No fee distributed so far, and tree doesn't exist"})
Expand Down
2 changes: 2 additions & 0 deletions src/providers/execution/contracts/CSModule.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
class CSModule(Contract):
abi_path = "./assets/CSModule.json"

MAX_OPERATORS_COUNT = 2 ** 64

# TODO: Inherit from the base class.
def __init__(self, address: ChecksumAddress | None = None) -> None:
with open(self.abi_path, encoding="utf-8") as f:
Expand Down

0 comments on commit 8388a26

Please sign in to comment.