diff --git a/.circleci/config.yml b/.circleci/config.yml index 21b894f1f7f..478c84500a9 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -390,7 +390,7 @@ jobs: build-appimage: machine: - image: ubuntu-2004:202104-01 + image: ubuntu-2004:202111-02 steps: - checkout - attach_workspace: @@ -398,6 +398,16 @@ jobs: - run: name: install appimage-builder command: | + # appimage-builder requires /dev/snd to exist. It creates containers during the testing phase + # that pass sound devices from the host to the testing container. (hard coded!) + # https://github.com/AppImageCrafters/appimage-builder/blob/master/appimagebuilder/modules/test/execution_test.py#L54 + # Circleci doesn't provide a working sound device; this is enough to fake it. + if [ ! -e /dev/snd ] + then + sudo mkdir /dev/snd + sudo mknod /dev/snd/ControlC0 c 1 2 + fi + # docs: https://appimage-builder.readthedocs.io/en/latest/intro/install.html sudo apt update sudo apt install -y python3-pip python3-setuptools patchelf desktop-file-utils libgdk-pixbuf2.0-dev fakeroot strace @@ -705,6 +715,17 @@ jobs: - packer/build: template: tools/packer/lotus.pkr.hcl args: "-var ci_workspace_bins=./linux-butterflynet -var lotus_network=butterflynet -var git_tag=$CIRCLE_TAG" + publish-packer-snap: + description: build packer image with snap. mainnet only. + executor: + name: packer/default + packer-version: 1.6.6 + steps: + - checkout + - attach_workspace: + at: "." + - packer/build: + template: tools/packer/lotus-snap.pkr.hcl publish-dockerhub: description: publish to dockerhub machine: @@ -991,10 +1012,19 @@ workflows: tags: only: - /^v\d+\.\d+\.\d+(-rc\d+)?$/ + - build-appimage: + filters: + branches: + ignore: + - /.*/ + tags: + only: + - /^v\d+\.\d+\.\d+(-rc\d+)?$/ - publish: requires: - build-all - build-macos + - build-appimage filters: branches: ignore: @@ -1073,3 +1103,13 @@ workflows: - publish-dockerhub: name: publish-dockerhub-nightly tag: nightly + monthly: + triggers: + - schedule: + cron: "0 0 1 * *" + filters: + branches: + only: + - master + jobs: + - publish-packer-snap diff --git a/.circleci/template.yml b/.circleci/template.yml index 8f5995d56d1..c8643e3ac5e 100644 --- a/.circleci/template.yml +++ b/.circleci/template.yml @@ -390,7 +390,7 @@ jobs: build-appimage: machine: - image: ubuntu-2004:202104-01 + image: ubuntu-2004:202111-02 steps: - checkout - attach_workspace: @@ -398,6 +398,16 @@ jobs: - run: name: install appimage-builder command: | + # appimage-builder requires /dev/snd to exist. It creates containers during the testing phase + # that pass sound devices from the host to the testing container. (hard coded!) + # https://github.com/AppImageCrafters/appimage-builder/blob/master/appimagebuilder/modules/test/execution_test.py#L54 + # Circleci doesn't provide a working sound device; this is enough to fake it. + if [ ! -e /dev/snd ] + then + sudo mkdir /dev/snd + sudo mknod /dev/snd/ControlC0 c 1 2 + fi + # docs: https://appimage-builder.readthedocs.io/en/latest/intro/install.html sudo apt update sudo apt install -y python3-pip python3-setuptools patchelf desktop-file-utils libgdk-pixbuf2.0-dev fakeroot strace @@ -705,6 +715,17 @@ jobs: - packer/build: template: tools/packer/lotus.pkr.hcl args: "-var ci_workspace_bins=./linux-butterflynet -var lotus_network=butterflynet -var git_tag=$CIRCLE_TAG" + publish-packer-snap: + description: build packer image with snap. mainnet only. + executor: + name: packer/default + packer-version: 1.6.6 + steps: + - checkout + - attach_workspace: + at: "." + - packer/build: + template: tools/packer/lotus-snap.pkr.hcl publish-dockerhub: description: publish to dockerhub machine: @@ -816,10 +837,19 @@ workflows: tags: only: - /^v\d+\.\d+\.\d+(-rc\d+)?$/ + - build-appimage: + filters: + branches: + ignore: + - /.*/ + tags: + only: + - /^v\d+\.\d+\.\d+(-rc\d+)?$/ - publish: requires: - build-all - build-macos + - build-appimage filters: branches: ignore: @@ -898,3 +928,13 @@ workflows: - publish-dockerhub: name: publish-dockerhub-nightly tag: nightly + monthly: + triggers: + - schedule: + cron: "0 0 1 * *" + filters: + branches: + only: + - master + jobs: + - publish-packer-snap diff --git a/.gitignore b/.gitignore index 467f315b8ef..33fbffa3c19 100644 --- a/.gitignore +++ b/.gitignore @@ -40,6 +40,7 @@ build/paramfetch.sh /bundle /darwin /linux +*.snap *-fuzz.zip /chain/types/work_msg/ diff --git a/AppImageBuilder.yml b/AppImageBuilder.yml index 19c74e4a26a..ff01b211229 100644 --- a/AppImageBuilder.yml +++ b/AppImageBuilder.yml @@ -49,23 +49,23 @@ AppDir: fedora: image: appimagecrafters/tests-env:fedora-30 command: ./AppRun - use_host_x: true + use_host_x: false debian: image: appimagecrafters/tests-env:debian-stable command: ./AppRun - use_host_x: true + use_host_x: false arch: image: appimagecrafters/tests-env:archlinux-latest command: ./AppRun - use_host_x: true + use_host_x: false centos: image: appimagecrafters/tests-env:centos-7 command: ./AppRun - use_host_x: true + use_host_x: false ubuntu: image: appimagecrafters/tests-env:ubuntu-xenial command: ./AppRun - use_host_x: true + use_host_x: false AppImage: arch: x86_64 update-information: guess diff --git a/api/api_full.go b/api/api_full.go index cf58a3cc6ee..4c4d6ebf936 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -689,7 +689,17 @@ type FullNode interface { // MethodGroup: Paych // The Paych methods are for interacting with and managing payment channels - PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error) //perm:sign + // PaychGet gets or creates a payment channel between address pair + // The specified amount will be reserved for use. If there aren't enough non-reserved funds + // available, funds will be added through an on-chain message. + // - When opts.OffChain is true, this call will not cause any messages to be sent to the chain (no automatic + // channel creation/funds adding). If the operation can't be performed without sending a message an error will be + // returned. Note that even when this option is specified, this call can be blocked by previous operations on the + // channel waiting for on-chain operations. + PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt, opts PaychGetOpts) (*ChannelInfo, error) //perm:sign + // PaychFund gets or creates a payment channel between address pair. + // The specified amount will be added to the channel through on-chain send for future use + PaychFund(ctx context.Context, from, to address.Address, amt types.BigInt) (*ChannelInfo, error) //perm:sign PaychGetWaitReady(context.Context, cid.Cid) (address.Address, error) //perm:sign PaychAvailableFunds(ctx context.Context, ch address.Address) (*ChannelAvailableFunds, error) //perm:sign PaychAvailableFundsByFromTo(ctx context.Context, from, to address.Address) (*ChannelAvailableFunds, error) //perm:sign @@ -828,6 +838,10 @@ const ( PCHOutbound ) +type PaychGetOpts struct { + OffChain bool +} + type PaychStatus struct { ControlAddr address.Address Direction PCHDir @@ -845,16 +859,23 @@ type ChannelAvailableFunds struct { From address.Address // To is the to address of the channel To address.Address - // ConfirmedAmt is the amount of funds that have been confirmed on-chain - // for the channel + + // ConfirmedAmt is the total amount of funds that have been confirmed on-chain for the channel ConfirmedAmt types.BigInt // PendingAmt is the amount of funds that are pending confirmation on-chain PendingAmt types.BigInt + + // NonReservedAmt is part of ConfirmedAmt that is available for use (e.g. when the payment channel was pre-funded) + NonReservedAmt types.BigInt + // PendingAvailableAmt is the amount of funds that are pending confirmation on-chain that will become available once confirmed + PendingAvailableAmt types.BigInt + // PendingWaitSentinel can be used with PaychGetWaitReady to wait for // confirmation of pending funds PendingWaitSentinel *cid.Cid // QueuedAmt is the amount that is queued up behind a pending request QueuedAmt types.BigInt + // VoucherRedeemedAmt is the amount that is redeemed by vouchers on-chain // and in the local datastore VoucherReedeemedAmt types.BigInt diff --git a/api/mocks/mock_full.go b/api/mocks/mock_full.go index 49c49e22ea3..1dce7fa9c39 100644 --- a/api/mocks/mock_full.go +++ b/api/mocks/mock_full.go @@ -2018,19 +2018,34 @@ func (mr *MockFullNodeMockRecorder) PaychCollect(arg0, arg1 interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PaychCollect", reflect.TypeOf((*MockFullNode)(nil).PaychCollect), arg0, arg1) } +// PaychFund mocks base method. +func (m *MockFullNode) PaychFund(arg0 context.Context, arg1, arg2 address.Address, arg3 big.Int) (*api.ChannelInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PaychFund", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(*api.ChannelInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PaychFund indicates an expected call of PaychFund. +func (mr *MockFullNodeMockRecorder) PaychFund(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PaychFund", reflect.TypeOf((*MockFullNode)(nil).PaychFund), arg0, arg1, arg2, arg3) +} + // PaychGet mocks base method. -func (m *MockFullNode) PaychGet(arg0 context.Context, arg1, arg2 address.Address, arg3 big.Int) (*api.ChannelInfo, error) { +func (m *MockFullNode) PaychGet(arg0 context.Context, arg1, arg2 address.Address, arg3 big.Int, arg4 api.PaychGetOpts) (*api.ChannelInfo, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PaychGet", arg0, arg1, arg2, arg3) + ret := m.ctrl.Call(m, "PaychGet", arg0, arg1, arg2, arg3, arg4) ret0, _ := ret[0].(*api.ChannelInfo) ret1, _ := ret[1].(error) return ret0, ret1 } // PaychGet indicates an expected call of PaychGet. -func (mr *MockFullNodeMockRecorder) PaychGet(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +func (mr *MockFullNodeMockRecorder) PaychGet(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PaychGet", reflect.TypeOf((*MockFullNode)(nil).PaychGet), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PaychGet", reflect.TypeOf((*MockFullNode)(nil).PaychGet), arg0, arg1, arg2, arg3, arg4) } // PaychGetWaitReady mocks base method. diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 756f38df958..fa907fab792 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -306,7 +306,9 @@ type FullNodeStruct struct { PaychCollect func(p0 context.Context, p1 address.Address) (cid.Cid, error) `perm:"sign"` - PaychGet func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*ChannelInfo, error) `perm:"sign"` + PaychFund func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*ChannelInfo, error) `perm:"sign"` + + PaychGet func(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 PaychGetOpts) (*ChannelInfo, error) `perm:"sign"` PaychGetWaitReady func(p0 context.Context, p1 cid.Cid) (address.Address, error) `perm:"sign"` @@ -2199,14 +2201,25 @@ func (s *FullNodeStub) PaychCollect(p0 context.Context, p1 address.Address) (cid return *new(cid.Cid), ErrNotSupported } -func (s *FullNodeStruct) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*ChannelInfo, error) { +func (s *FullNodeStruct) PaychFund(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*ChannelInfo, error) { + if s.Internal.PaychFund == nil { + return nil, ErrNotSupported + } + return s.Internal.PaychFund(p0, p1, p2, p3) +} + +func (s *FullNodeStub) PaychFund(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*ChannelInfo, error) { + return nil, ErrNotSupported +} + +func (s *FullNodeStruct) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 PaychGetOpts) (*ChannelInfo, error) { if s.Internal.PaychGet == nil { return nil, ErrNotSupported } - return s.Internal.PaychGet(p0, p1, p2, p3) + return s.Internal.PaychGet(p0, p1, p2, p3, p4) } -func (s *FullNodeStub) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt) (*ChannelInfo, error) { +func (s *FullNodeStub) PaychGet(p0 context.Context, p1 address.Address, p2 address.Address, p3 types.BigInt, p4 PaychGetOpts) (*ChannelInfo, error) { return nil, ErrNotSupported } diff --git a/api/v0api/v1_wrapper.go b/api/v0api/v1_wrapper.go index 7e0d7a94ab6..3f2dd837391 100644 --- a/api/v0api/v1_wrapper.go +++ b/api/v0api/v1_wrapper.go @@ -337,4 +337,8 @@ func (w *WrapperV1Full) clientRetrieve(ctx context.Context, order RetrievalOrder finish(w.ClientExport(ctx, eref, *ref)) } +func (w *WrapperV1Full) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) { + return w.FullNode.PaychFund(ctx, from, to, amt) +} + var _ FullNode = &WrapperV1Full{} diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 19d932360ad..2261c102e43 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 03e78adc597..8475edc774d 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index fed8613326b..465501bc648 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/chain/sync.go b/chain/sync.go index 0293ae25ecb..a34a83d76f1 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1244,25 +1244,3 @@ func (syncer *Syncer) CheckBadBlockCache(blk cid.Cid) (string, bool) { bbr, ok := syncer.bad.Has(blk) return bbr.String(), ok } - -func (syncer *Syncer) getLatestBeaconEntry(ctx context.Context, ts *types.TipSet) (*types.BeaconEntry, error) { - cur := ts - for i := 0; i < 20; i++ { - cbe := cur.Blocks()[0].BeaconEntries - if len(cbe) > 0 { - return &cbe[len(cbe)-1], nil - } - - if cur.Height() == 0 { - return nil, xerrors.Errorf("made it back to genesis block without finding beacon entry") - } - - next, err := syncer.store.LoadTipSet(ctx, cur.Parents()) - if err != nil { - return nil, xerrors.Errorf("failed to load parents when searching back for latest beacon entry: %w", err) - } - cur = next - } - - return nil, xerrors.Errorf("found NO beacon entries in the 20 latest tipsets") -} diff --git a/chain/sync_manager_test.go b/chain/sync_manager_test.go index 5f23e67c071..bbd690d230c 100644 --- a/chain/sync_manager_test.go +++ b/chain/sync_manager_test.go @@ -8,6 +8,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/mock" + "github.com/stretchr/testify/require" ) func init() { @@ -240,3 +241,34 @@ func TestSyncManager(t *testing.T) { op3.done() }) } + +func TestSyncManagerBucketSet(t *testing.T) { + ts1 := mock.TipSet(mock.MkBlock(nil, 0, 0)) + ts2 := mock.TipSet(mock.MkBlock(ts1, 1, 0)) + bucket1 := newSyncTargetBucket(ts1, ts2) + bucketSet := syncBucketSet{buckets: []*syncTargetBucket{bucket1}} + + // inserting a tipset (potential sync target) from an existing chain, should add to an existing bucket + //stm: @CHAIN_SYNCER_ADD_SYNC_TARGET_001 + ts3 := mock.TipSet(mock.MkBlock(ts2, 2, 0)) + bucketSet.Insert(ts3) + require.Equal(t, 1, len(bucketSet.buckets)) + require.Equal(t, 3, len(bucketSet.buckets[0].tips)) + + // inserting a tipset from new chain, should create a new bucket + ts4fork := mock.TipSet(mock.MkBlock(nil, 1, 1)) + bucketSet.Insert(ts4fork) + require.Equal(t, 2, len(bucketSet.buckets)) + require.Equal(t, 3, len(bucketSet.buckets[0].tips)) + require.Equal(t, 1, len(bucketSet.buckets[1].tips)) + + // Pop removes the best bucket (best sync target), e.g. bucket1 + //stm: @CHAIN_SYNCER_SELECT_SYNC_TARGET_001 + popped := bucketSet.Pop() + require.Equal(t, popped, bucket1) + require.Equal(t, 1, len(bucketSet.buckets)) + + // PopRelated removes the bucket containing the given tipset, leaving the set empty + bucketSet.PopRelated(ts4fork) + require.Equal(t, 0, len(bucketSet.buckets)) +} diff --git a/chain/sync_test.go b/chain/sync_test.go index 35566169f2c..96ed1440e92 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -1098,3 +1098,158 @@ func TestInvalidHeight(t *testing.T) { tu.mineOnBlock(base, 0, nil, false, true, nil, -1, true) } + +// TestIncomingBlocks mines new blocks and checks if the incoming channel streams new block headers properly +func TestIncomingBlocks(t *testing.T) { + H := 50 + tu := prepSyncTest(t, H) + + client := tu.addClientNode() + require.NoError(t, tu.mn.LinkAll()) + + clientNode := tu.nds[client] + //stm: @CHAIN_SYNCER_INCOMING_BLOCKS_001 + incoming, err := clientNode.SyncIncomingBlocks(tu.ctx) + require.NoError(tu.t, err) + + tu.connect(client, 0) + tu.waitUntilSync(0, client) + tu.compareSourceState(client) + + timeout := time.After(10 * time.Second) + + for i := 0; i < 5; i++ { + tu.mineNewBlock(0, nil) + tu.waitUntilSync(0, client) + tu.compareSourceState(client) + + // just in case, so we don't get deadlocked + select { + case <-incoming: + case <-timeout: + tu.t.Fatal("TestIncomingBlocks timeout") + } + } +} + +// TestSyncManualBadTS tests manually marking and unmarking blocks in the bad TS cache +func TestSyncManualBadTS(t *testing.T) { + // Test setup: + // - source node is fully synced, + // - client node is unsynced + // - client manually marked source's head and it's parent as bad + H := 50 + tu := prepSyncTest(t, H) + + client := tu.addClientNode() + require.NoError(t, tu.mn.LinkAll()) + + sourceHead, err := tu.nds[source].ChainHead(tu.ctx) + require.NoError(tu.t, err) + + clientHead, err := tu.nds[client].ChainHead(tu.ctx) + require.NoError(tu.t, err) + + require.True(tu.t, !sourceHead.Equals(clientHead), "source and client should be out of sync in test setup") + + //stm: @CHAIN_SYNCER_MARK_BAD_001 + err = tu.nds[client].SyncMarkBad(tu.ctx, sourceHead.Cids()[0]) + require.NoError(tu.t, err) + + sourceHeadParent := sourceHead.Parents().Cids()[0] + err = tu.nds[client].SyncMarkBad(tu.ctx, sourceHeadParent) + require.NoError(tu.t, err) + + //stm: @CHAIN_SYNCER_CHECK_BAD_001 + reason, err := tu.nds[client].SyncCheckBad(tu.ctx, sourceHead.Cids()[0]) + require.NoError(tu.t, err) + require.NotEqual(tu.t, "", reason, "block is not bad after manually marking") + + reason, err = tu.nds[client].SyncCheckBad(tu.ctx, sourceHeadParent) + require.NoError(tu.t, err) + require.NotEqual(tu.t, "", reason, "block is not bad after manually marking") + + // Assertion 1: + // - client shouldn't be synced after timeout, because the source TS is marked bad. + // - bad block is the first block that should be synced, 1sec should be enough + tu.connect(1, 0) + timeout := time.After(1 * time.Second) + <-timeout + + clientHead, err = tu.nds[client].ChainHead(tu.ctx) + require.NoError(tu.t, err) + require.True(tu.t, !sourceHead.Equals(clientHead), "source and client should be out of sync if source head is bad") + + // Assertion 2: + // - after unmarking blocks as bad and reconnecting, source & client should be in sync + //stm: @CHAIN_SYNCER_UNMARK_BAD_001 + err = tu.nds[client].SyncUnmarkBad(tu.ctx, sourceHead.Cids()[0]) + require.NoError(tu.t, err) + + reason, err = tu.nds[client].SyncCheckBad(tu.ctx, sourceHead.Cids()[0]) + require.NoError(tu.t, err) + require.Equal(tu.t, "", reason, "block is still bad after manually unmarking") + + err = tu.nds[client].SyncUnmarkAllBad(tu.ctx) + require.NoError(tu.t, err) + + reason, err = tu.nds[client].SyncCheckBad(tu.ctx, sourceHeadParent) + require.NoError(tu.t, err) + require.Equal(tu.t, "", reason, "block is still bad after manually unmarking") + + tu.disconnect(1, 0) + tu.connect(1, 0) + + tu.waitUntilSync(0, client) + tu.compareSourceState(client) +} + +// TestState tests fetching the sync worker state before, during & after the sync +func TestSyncState(t *testing.T) { + H := 50 + tu := prepSyncTest(t, H) + + client := tu.addClientNode() + require.NoError(t, tu.mn.LinkAll()) + clientNode := tu.nds[client] + sourceHead, err := tu.nds[source].ChainHead(tu.ctx) + require.NoError(tu.t, err) + + // sync state should be empty before the sync + state, err := clientNode.SyncState(tu.ctx) + require.NoError(tu.t, err) + require.Equal(tu.t, len(state.ActiveSyncs), 0) + + tu.connect(client, 0) + + // wait until sync starts, or at most `timeout` seconds + timeout := time.After(5 * time.Second) + activeSyncs := []api.ActiveSync{} + + for len(activeSyncs) == 0 { + //stm: @CHAIN_SYNCER_STATE_001 + state, err = clientNode.SyncState(tu.ctx) + require.NoError(tu.t, err) + activeSyncs = state.ActiveSyncs + + sleep := time.After(100 * time.Millisecond) + select { + case <-sleep: + case <-timeout: + tu.t.Fatal("TestSyncState timeout") + } + } + + // check state during sync + require.Equal(tu.t, len(activeSyncs), 1) + require.True(tu.t, activeSyncs[0].Target.Equals(sourceHead)) + + tu.waitUntilSync(0, client) + tu.compareSourceState(client) + + // check state after sync + state, err = clientNode.SyncState(tu.ctx) + require.NoError(tu.t, err) + require.Equal(tu.t, len(state.ActiveSyncs), 1) + require.Equal(tu.t, state.ActiveSyncs[0].Stage, api.StageSyncComplete) +} diff --git a/cli/client.go b/cli/client.go index 64114fa171b..da725a7be62 100644 --- a/cli/client.go +++ b/cli/client.go @@ -358,7 +358,13 @@ The minimum value is 518400 (6 months).`, &CidBaseFlag, }, Action: func(cctx *cli.Context) error { + + expectedArgsMsg := "expected 4 args: dataCid, miner, price, duration" + if !cctx.Args().Present() { + if cctx.Bool("manual-stateless-deal") { + return xerrors.New("--manual-stateless-deal can not be combined with interactive deal mode: you must specify the " + expectedArgsMsg) + } return interactiveDeal(cctx) } @@ -371,7 +377,7 @@ The minimum value is 518400 (6 months).`, afmt := NewAppFmt(cctx.App) if cctx.NArg() != 4 { - return xerrors.New("expected 4 args: dataCid, miner, price, duration") + return xerrors.New(expectedArgsMsg) } // [data, miner, price, dur] diff --git a/cli/paych.go b/cli/paych.go index 1d5e304c383..92c1a13e31f 100644 --- a/cli/paych.go +++ b/cli/paych.go @@ -8,7 +8,7 @@ import ( "sort" "strings" - "github.com/filecoin-project/lotus/api" + lapi "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/paychmgr" @@ -39,12 +39,15 @@ var paychAddFundsCmd = &cli.Command{ Usage: "Add funds to the payment channel between fromAddress and toAddress. Creates the payment channel if it doesn't already exist.", ArgsUsage: "[fromAddress toAddress amount]", Flags: []cli.Flag{ - &cli.BoolFlag{ Name: "restart-retrievals", Usage: "restart stalled retrieval deals on this payment channel", Value: true, }, + &cli.BoolFlag{ + Name: "reserve", + Usage: "mark funds as reserved", + }, }, Action: func(cctx *cli.Context) error { if cctx.Args().Len() != 3 { @@ -66,7 +69,7 @@ var paychAddFundsCmd = &cli.Command{ return ShowHelp(cctx, fmt.Errorf("parsing amount failed: %s", err)) } - api, closer, err := GetFullNodeAPI(cctx) + api, closer, err := GetFullNodeAPIV1(cctx) if err != nil { return err } @@ -76,7 +79,14 @@ var paychAddFundsCmd = &cli.Command{ // Send a message to chain to create channel / add funds to existing // channel - info, err := api.PaychGet(ctx, from, to, types.BigInt(amt)) + var info *lapi.ChannelInfo + if cctx.Bool("reserve") { + info, err = api.PaychGet(ctx, from, to, types.BigInt(amt), lapi.PaychGetOpts{ + OffChain: false, + }) + } else { + info, err = api.PaychFund(ctx, from, to, types.BigInt(amt)) + } if err != nil { return err } @@ -163,13 +173,13 @@ var paychStatusCmd = &cli.Command{ }, } -func paychStatus(writer io.Writer, avail *api.ChannelAvailableFunds) { +func paychStatus(writer io.Writer, avail *lapi.ChannelAvailableFunds) { if avail.Channel == nil { if avail.PendingWaitSentinel != nil { fmt.Fprint(writer, "Creating channel\n") fmt.Fprintf(writer, " From: %s\n", avail.From) fmt.Fprintf(writer, " To: %s\n", avail.To) - fmt.Fprintf(writer, " Pending Amt: %d\n", avail.PendingAmt) + fmt.Fprintf(writer, " Pending Amt: %s\n", types.FIL(avail.PendingAmt)) fmt.Fprintf(writer, " Wait Sentinel: %s\n", avail.PendingWaitSentinel) return } @@ -189,10 +199,12 @@ func paychStatus(writer io.Writer, avail *api.ChannelAvailableFunds) { {"Channel", avail.Channel.String()}, {"From", avail.From.String()}, {"To", avail.To.String()}, - {"Confirmed Amt", fmt.Sprintf("%d", avail.ConfirmedAmt)}, - {"Pending Amt", fmt.Sprintf("%d", avail.PendingAmt)}, - {"Queued Amt", fmt.Sprintf("%d", avail.QueuedAmt)}, - {"Voucher Redeemed Amt", fmt.Sprintf("%d", avail.VoucherReedeemedAmt)}, + {"Confirmed Amt", fmt.Sprintf("%s", types.FIL(avail.ConfirmedAmt))}, + {"Available Amt", fmt.Sprintf("%s", types.FIL(avail.NonReservedAmt))}, + {"Voucher Redeemed Amt", fmt.Sprintf("%s", types.FIL(avail.VoucherReedeemedAmt))}, + {"Pending Amt", fmt.Sprintf("%s", types.FIL(avail.PendingAmt))}, + {"Pending Available Amt", fmt.Sprintf("%s", types.FIL(avail.PendingAvailableAmt))}, + {"Queued Amt", fmt.Sprintf("%s", types.FIL(avail.QueuedAmt))}, } if avail.PendingWaitSentinel != nil { nameValues = append(nameValues, []string{ @@ -576,7 +588,7 @@ func outputVoucher(w io.Writer, v *paych.SignedVoucher, export bool) error { } } - fmt.Fprintf(w, "Lane %d, Nonce %d: %s", v.Lane, v.Nonce, v.Amount.String()) + fmt.Fprintf(w, "Lane %d, Nonce %d: %s", v.Lane, v.Nonce, types.FIL(v.Amount)) if export { fmt.Fprintf(w, "; %s", enc) } diff --git a/cli/wallet.go b/cli/wallet.go index 9faa10677dd..752a80bfbd9 100644 --- a/cli/wallet.go +++ b/cli/wallet.go @@ -56,6 +56,8 @@ var walletNew = &cli.Command{ defer closer() ctx := ReqContext(cctx) + afmt := NewAppFmt(cctx.App) + t := cctx.Args().First() if t == "" { t = "secp256k1" @@ -66,7 +68,7 @@ var walletNew = &cli.Command{ return err } - fmt.Println(nk.String()) + afmt.Println(nk.String()) return nil }, @@ -100,6 +102,8 @@ var walletList = &cli.Command{ defer closer() ctx := ReqContext(cctx) + afmt := NewAppFmt(cctx.App) + addrs, err := api.WalletList(ctx) if err != nil { return err @@ -120,7 +124,7 @@ var walletList = &cli.Command{ for _, addr := range addrs { if cctx.Bool("addr-only") { - fmt.Println(addr.String()) + afmt.Println(addr.String()) } else { a, err := api.StateGetActor(ctx, addr, types.EmptyTSK) if err != nil { @@ -187,6 +191,8 @@ var walletBalance = &cli.Command{ defer closer() ctx := ReqContext(cctx) + afmt := NewAppFmt(cctx.App) + var addr address.Address if cctx.Args().First() != "" { addr, err = address.NewFromString(cctx.Args().First()) @@ -203,9 +209,9 @@ var walletBalance = &cli.Command{ } if balance.Equals(types.NewInt(0)) { - fmt.Printf("%s (warning: may display 0 if chain sync in progress)\n", types.FIL(balance)) + afmt.Printf("%s (warning: may display 0 if chain sync in progress)\n", types.FIL(balance)) } else { - fmt.Printf("%s\n", types.FIL(balance)) + afmt.Printf("%s\n", types.FIL(balance)) } return nil @@ -223,12 +229,14 @@ var walletGetDefault = &cli.Command{ defer closer() ctx := ReqContext(cctx) + afmt := NewAppFmt(cctx.App) + addr, err := api.WalletDefaultAddress(ctx) if err != nil { return err } - fmt.Printf("%s\n", addr.String()) + afmt.Printf("%s\n", addr.String()) return nil }, } @@ -270,6 +278,8 @@ var walletExport = &cli.Command{ defer closer() ctx := ReqContext(cctx) + afmt := NewAppFmt(cctx.App) + if !cctx.Args().Present() { return fmt.Errorf("must specify key to export") } @@ -289,7 +299,7 @@ var walletExport = &cli.Command{ return err } - fmt.Println(hex.EncodeToString(b)) + afmt.Println(hex.EncodeToString(b)) return nil }, } @@ -403,6 +413,8 @@ var walletSign = &cli.Command{ defer closer() ctx := ReqContext(cctx) + afmt := NewAppFmt(cctx.App) + if !cctx.Args().Present() || cctx.NArg() != 2 { return fmt.Errorf("must specify signing address and message to sign") } @@ -427,7 +439,7 @@ var walletSign = &cli.Command{ sigBytes := append([]byte{byte(sig.Type)}, sig.Data...) - fmt.Println(hex.EncodeToString(sigBytes)) + afmt.Println(hex.EncodeToString(sigBytes)) return nil }, } @@ -444,6 +456,8 @@ var walletVerify = &cli.Command{ defer closer() ctx := ReqContext(cctx) + afmt := NewAppFmt(cctx.App) + if !cctx.Args().Present() || cctx.NArg() != 3 { return fmt.Errorf("must specify signing address, message, and signature to verify") } @@ -476,10 +490,10 @@ var walletVerify = &cli.Command{ return err } if ok { - fmt.Println("valid") + afmt.Println("valid") return nil } - fmt.Println("invalid") + afmt.Println("invalid") return NewCliError("CLI Verify called with invalid signature") }, } @@ -547,6 +561,8 @@ var walletMarketWithdraw = &cli.Command{ defer closer() ctx := ReqContext(cctx) + afmt := NewAppFmt(cctx.App) + var wallet address.Address if cctx.String("wallet") != "" { wallet, err = address.NewFromString(cctx.String("wallet")) @@ -622,7 +638,7 @@ var walletMarketWithdraw = &cli.Command{ return xerrors.Errorf("fund manager withdraw error: %w", err) } - fmt.Printf("WithdrawBalance message cid: %s\n", smsg) + afmt.Printf("WithdrawBalance message cid: %s\n", smsg) // wait for it to get mined into a block wait, err := api.StateWaitMsg(ctx, smsg, uint64(cctx.Int("confidence"))) @@ -632,7 +648,7 @@ var walletMarketWithdraw = &cli.Command{ // check it executed successfully if wait.Receipt.ExitCode != 0 { - fmt.Println(cctx.App.Writer, "withdrawal failed!") + afmt.Println(cctx.App.Writer, "withdrawal failed!") return err } @@ -647,7 +663,7 @@ var walletMarketWithdraw = &cli.Command{ return err } - fmt.Printf("Successfully withdrew %s \n", types.FIL(withdrawn)) + afmt.Printf("Successfully withdrew %s \n", types.FIL(withdrawn)) if withdrawn.LessThan(amt) { fmt.Printf("Note that this is less than the requested amount of %s \n", types.FIL(amt)) } @@ -681,6 +697,8 @@ var walletMarketAdd = &cli.Command{ defer closer() ctx := ReqContext(cctx) + afmt := NewAppFmt(cctx.App) + // Get amount param if !cctx.Args().Present() { return fmt.Errorf("must pass amount to add") @@ -722,7 +740,7 @@ var walletMarketAdd = &cli.Command{ return xerrors.Errorf("add balance error: %w", err) } - fmt.Printf("AddBalance message cid: %s\n", smsg) + afmt.Printf("AddBalance message cid: %s\n", smsg) return nil }, diff --git a/cli/wallet_test.go b/cli/wallet_test.go new file mode 100644 index 00000000000..22b3bc3e4fa --- /dev/null +++ b/cli/wallet_test.go @@ -0,0 +1,333 @@ +//stm: #cli +package cli + +import ( + "context" + "encoding/hex" + "encoding/json" + "fmt" + "testing" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/lotus/api" + apitypes "github.com/filecoin-project/lotus/api/types" + types "github.com/filecoin-project/lotus/chain/types" + "github.com/golang/mock/gomock" + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/assert" +) + +func TestWalletNew(t *testing.T) { + app, mockApi, buffer, done := NewMockAppWithFullAPI(t, WithCategory("wallet", walletNew)) + defer done() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + keyType := types.KeyType("secp256k1") + address, err := address.NewFromString("t0123") + assert.NoError(t, err) + + mockApi.EXPECT().WalletNew(ctx, keyType).Return(address, nil) + + //stm: @CLI_WALLET_NEW_001 + err = app.Run([]string{"wallet", "new"}) + assert.NoError(t, err) + assert.Contains(t, buffer.String(), address.String()) +} + +func TestWalletList(t *testing.T) { + + addr, err := address.NewIDAddress(1234) + addresses := []address.Address{addr} + assert.NoError(t, err) + + cid := cid.Cid{} + key := types.NewTipSetKey(cid) + + actor := types.Actor{ + Code: cid, + Head: cid, + Nonce: 0, + Balance: big.NewInt(100), + } + + t.Run("wallet-list-addr-only", func(t *testing.T) { + + app, mockApi, buf, done := NewMockAppWithFullAPI(t, WithCategory("wallet", walletList)) + defer done() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + gomock.InOrder( + mockApi.EXPECT().WalletList(ctx).Return(addresses, nil), + mockApi.EXPECT().WalletDefaultAddress(ctx).Return(addr, nil), + ) + + //stm: @CLI_WALLET_LIST_001 + err := app.Run([]string{"wallet", "list", "--addr-only"}) + assert.NoError(t, err) + assert.Contains(t, buf.String(), addr.String()) + }) + t.Run("wallet-list-id", func(t *testing.T) { + + app, mockApi, _, done := NewMockAppWithFullAPI(t, WithCategory("wallet", walletList)) + defer done() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + gomock.InOrder( + mockApi.EXPECT().WalletList(ctx).Return(addresses, nil), + mockApi.EXPECT().WalletDefaultAddress(ctx).Return(addr, nil), + mockApi.EXPECT().StateGetActor(ctx, addr, key).Return(&actor, nil), + mockApi.EXPECT().StateLookupID(ctx, addr, key).Return(addr, nil), + ) + + //stm: @CLI_WALLET_LIST_002 + err := app.Run([]string{"wallet", "list", "--id"}) + assert.NoError(t, err) + }) + t.Run("wallet-list-market", func(t *testing.T) { + + app, mockApi, _, done := NewMockAppWithFullAPI(t, WithCategory("wallet", walletList)) + defer done() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + balance := api.MarketBalance{ + Escrow: big.NewInt(1234), + Locked: big.NewInt(123), + } + + gomock.InOrder( + mockApi.EXPECT().WalletList(ctx).Return(addresses, nil), + mockApi.EXPECT().WalletDefaultAddress(ctx).Return(addr, nil), + mockApi.EXPECT().StateGetActor(ctx, addr, key).Return(&actor, nil), + mockApi.EXPECT().StateMarketBalance(ctx, addr, key).Return(balance, nil), + ) + + //stm: @CLI_WALLET_LIST_003 + err := app.Run([]string{"wallet", "list", "--market"}) + assert.NoError(t, err) + }) +} + +func TestWalletBalance(t *testing.T) { + app, mockApi, buffer, done := NewMockAppWithFullAPI(t, WithCategory("wallet", walletBalance)) + defer done() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + addr, err := address.NewIDAddress(1234) + assert.NoError(t, err) + + balance := big.NewInt(1234) + + mockApi.EXPECT().WalletBalance(ctx, addr).Return(balance, nil) + + //stm: @CLI_WALLET_BALANCE_001 + err = app.Run([]string{"wallet", "balance", "f01234"}) + assert.NoError(t, err) + assert.Contains(t, buffer.String(), balance.String()) +} + +func TestWalletGetDefault(t *testing.T) { + app, mockApi, buffer, done := NewMockAppWithFullAPI(t, WithCategory("wallet", walletGetDefault)) + defer done() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + addr, err := address.NewFromString("t0123") + assert.NoError(t, err) + + mockApi.EXPECT().WalletDefaultAddress(ctx).Return(addr, nil) + + //stm: @CLI_WALLET_GET_DEFAULT_001 + err = app.Run([]string{"wallet", "default"}) + assert.NoError(t, err) + assert.Contains(t, buffer.String(), addr.String()) +} + +func TestWalletSetDefault(t *testing.T) { + app, mockApi, _, done := NewMockAppWithFullAPI(t, WithCategory("wallet", walletSetDefault)) + defer done() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + addr, err := address.NewIDAddress(1234) + assert.NoError(t, err) + + mockApi.EXPECT().WalletSetDefault(ctx, addr).Return(nil) + + //stm: @CLI_WALLET_SET_DEFAULT_001 + err = app.Run([]string{"wallet", "set-default", "f01234"}) + assert.NoError(t, err) +} + +func TestWalletExport(t *testing.T) { + app, mockApi, buffer, done := NewMockAppWithFullAPI(t, WithCategory("wallet", walletExport)) + defer done() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + addr, err := address.NewIDAddress(1234) + assert.NoError(t, err) + + keyInfo := types.KeyInfo{ + Type: types.KTSecp256k1, + PrivateKey: []byte("0x000000000000000000001"), + } + + mockApi.EXPECT().WalletExport(ctx, addr).Return(&keyInfo, nil) + + ki, err := json.Marshal(keyInfo) + assert.NoError(t, err) + + //stm: @CLI_WALLET_EXPORT_001 + err = app.Run([]string{"wallet", "export", "f01234"}) + assert.NoError(t, err) + assert.Contains(t, buffer.String(), hex.EncodeToString(ki)) +} + +func TestWalletSign(t *testing.T) { + app, mockApi, buffer, done := NewMockAppWithFullAPI(t, WithCategory("wallet", walletSign)) + defer done() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + addr, err := address.NewFromString("f01234") + assert.NoError(t, err) + + msg, err := hex.DecodeString("01") + assert.NoError(t, err) + + signature := crypto.Signature{ + Type: crypto.SigTypeSecp256k1, + Data: []byte{0x01}, + } + + mockApi.EXPECT().WalletSign(ctx, addr, msg).Return(&signature, nil) + + sigBytes := append([]byte{byte(signature.Type)}, signature.Data...) + + //stm: @CLI_WALLET_SIGN_001 + err = app.Run([]string{"wallet", "sign", "f01234", "01"}) + assert.NoError(t, err) + assert.Contains(t, buffer.String(), hex.EncodeToString(sigBytes)) +} + +func TestWalletVerify(t *testing.T) { + app, mockApi, buffer, done := NewMockAppWithFullAPI(t, WithCategory("wallet", walletVerify)) + defer done() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + addr, err := address.NewIDAddress(1234) + assert.NoError(t, err) + + msg := []byte{1} + signature := crypto.Signature{ + Type: crypto.SigTypeSecp256k1, + Data: []byte{}, + } + + mockApi.EXPECT().WalletVerify(ctx, addr, msg, &signature).Return(true, nil) + + //stm: @CLI_WALLET_VERIFY_001 + err = app.Run([]string{"wallet", "verify", "f01234", "01", "01"}) + assert.NoError(t, err) + assert.Contains(t, buffer.String(), "valid") +} + +func TestWalletDelete(t *testing.T) { + app, mockApi, _, done := NewMockAppWithFullAPI(t, WithCategory("wallet", walletDelete)) + defer done() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + addr, err := address.NewIDAddress(1234) + assert.NoError(t, err) + + mockApi.EXPECT().WalletDelete(ctx, addr).Return(nil) + + //stm: @CLI_WALLET_DELETE_001 + err = app.Run([]string{"wallet", "delete", "f01234"}) + assert.NoError(t, err) +} + +func TestWalletMarketWithdraw(t *testing.T) { + app, mockApi, buffer, done := NewMockAppWithFullAPI(t, WithCategory("wallet", walletMarket)) + defer done() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + addr, err := address.NewIDAddress(1234) + assert.NoError(t, err) + + balance := api.MarketBalance{ + Escrow: big.NewInt(100), + Locked: big.NewInt(10), + } + + h, err := hex.DecodeString("12209cbc07c3f991725836a3aa2a581ca2029198aa420b9d99bc0e131d9f3e2cbe47") + assert.NoError(t, err) + cid := cid.NewCidV0(multihash.Multihash(h)) + msgLookup := api.MsgLookup{} + + var networkVers apitypes.NetworkVersion + + gomock.InOrder( + mockApi.EXPECT().StateMarketBalance(ctx, addr, types.TipSetKey{}).Return(balance, nil), + // mock reserve to 10 + mockApi.EXPECT().MarketGetReserved(ctx, addr).Return(big.NewInt(10), nil), + // available should be 80.. escrow - locked - reserve + mockApi.EXPECT().MarketWithdraw(ctx, addr, addr, big.NewInt(80)).Return(cid, nil), + mockApi.EXPECT().StateWaitMsg(ctx, cid, uint64(5), abi.ChainEpoch(int64(-1)), true).Return(&msgLookup, nil), + mockApi.EXPECT().StateNetworkVersion(ctx, types.TipSetKey{}).Return(networkVers, nil), + ) + + //stm: @CLI_WALLET_MARKET_WITHDRAW_001 + err = app.Run([]string{"wallet", "market", "withdraw", "--wallet", addr.String()}) + assert.NoError(t, err) + assert.Contains(t, buffer.String(), fmt.Sprintf("WithdrawBalance message cid: %s", cid)) +} + +func TestWalletMarketAdd(t *testing.T) { + app, mockApi, buffer, done := NewMockAppWithFullAPI(t, WithCategory("wallet", walletMarket)) + defer done() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + toAddr := address.Address{} + defaultAddr := address.Address{} + + h, err := hex.DecodeString("12209cbc07c3f991725836a3aa2a581ca2029198aa420b9d99bc0e131d9f3e2cbe47") + assert.NoError(t, err) + cid := cid.NewCidV0(multihash.Multihash(h)) + + gomock.InOrder( + mockApi.EXPECT().WalletDefaultAddress(ctx).Return(defaultAddr, nil), + mockApi.EXPECT().MarketAddBalance(ctx, defaultAddr, toAddr, big.NewInt(80)).Return(cid, nil), + ) + + //stm: @CLI_WALLET_MARKET_ADD_001 + err = app.Run([]string{"wallet", "market", "add", "0.000000000000000080", "--address", toAddr.String()}) + assert.NoError(t, err) + assert.Contains(t, buffer.String(), fmt.Sprintf("AddBalance message cid: %s", cid)) +} diff --git a/cmd/lotus-shed/diff.go b/cmd/lotus-shed/diff.go index bcaa041227e..d576f73b4eb 100644 --- a/cmd/lotus-shed/diff.go +++ b/cmd/lotus-shed/diff.go @@ -35,7 +35,7 @@ var diffStateTrees = &cli.Command{ return xerrors.Errorf("expected two state-tree roots") } - argA := cctx.Args().Get(1) + argA := cctx.Args().Get(0) rootA, err := cid.Parse(argA) if err != nil { return xerrors.Errorf("first state-tree root (%q) is not a CID: %w", argA, err) diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 813a0a9bd9f..f285ba74e63 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -119,9 +119,8 @@ var DaemonCmd = &cli.Command{ Usage: "halt the process after importing chain from file", }, &cli.BoolFlag{ - Name: "lite", - Usage: "start lotus in lite mode", - Hidden: true, + Name: "lite", + Usage: "start lotus in lite mode", }, &cli.StringFlag{ Name: "pprof", diff --git a/documentation/en/api-v0-methods.md b/documentation/en/api-v0-methods.md index 16e86775204..eb195df8a7a 100644 --- a/documentation/en/api-v0-methods.md +++ b/documentation/en/api-v0-methods.md @@ -4110,6 +4110,8 @@ Response: "To": "f01234", "ConfirmedAmt": "0", "PendingAmt": "0", + "NonReservedAmt": "0", + "PendingAvailableAmt": "0", "PendingWaitSentinel": null, "QueuedAmt": "0", "VoucherReedeemedAmt": "0" @@ -4137,6 +4139,8 @@ Response: "To": "f01234", "ConfirmedAmt": "0", "PendingAmt": "0", + "NonReservedAmt": "0", + "PendingAvailableAmt": "0", "PendingWaitSentinel": null, "QueuedAmt": "0", "VoucherReedeemedAmt": "0" diff --git a/documentation/en/api-v1-unstable-methods.md b/documentation/en/api-v1-unstable-methods.md index 0ff7bc8c873..48b4540fef7 100644 --- a/documentation/en/api-v1-unstable-methods.md +++ b/documentation/en/api-v1-unstable-methods.md @@ -150,6 +150,7 @@ * [PaychAvailableFunds](#PaychAvailableFunds) * [PaychAvailableFundsByFromTo](#PaychAvailableFundsByFromTo) * [PaychCollect](#PaychCollect) + * [PaychFund](#PaychFund) * [PaychGet](#PaychGet) * [PaychGetWaitReady](#PaychGetWaitReady) * [PaychList](#PaychList) @@ -4505,6 +4506,8 @@ Response: "To": "f01234", "ConfirmedAmt": "0", "PendingAmt": "0", + "NonReservedAmt": "0", + "PendingAvailableAmt": "0", "PendingWaitSentinel": null, "QueuedAmt": "0", "VoucherReedeemedAmt": "0" @@ -4532,6 +4535,8 @@ Response: "To": "f01234", "ConfirmedAmt": "0", "PendingAmt": "0", + "NonReservedAmt": "0", + "PendingAvailableAmt": "0", "PendingWaitSentinel": null, "QueuedAmt": "0", "VoucherReedeemedAmt": "0" @@ -4557,8 +4562,10 @@ Response: } ``` -### PaychGet -There are not yet any comments for this method. +### PaychFund +PaychFund gets or creates a payment channel between address pair. +The specified amount will be added to the channel through on-chain send for future use + Perms: sign @@ -4581,6 +4588,40 @@ Response: } ``` +### PaychGet +PaychGet gets or creates a payment channel between address pair + The specified amount will be reserved for use. If there aren't enough non-reserved funds + available, funds will be added through an on-chain message. + - When opts.OffChain is true, this call will not cause any messages to be sent to the chain (no automatic + channel creation/funds adding). If the operation can't be performed without sending a message an error will be + returned. Note that even when this option is specified, this call can be blocked by previous operations on the + channel waiting for on-chain operations. + + +Perms: sign + +Inputs: +```json +[ + "f01234", + "f01234", + "0", + { + "OffChain": true + } +] +``` + +Response: +```json +{ + "Channel": "f01234", + "WaitSentinel": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + } +} +``` + ### PaychGetWaitReady diff --git a/documentation/en/cli-lotus.md b/documentation/en/cli-lotus.md index ae575907f5c..a9697f3aff0 100644 --- a/documentation/en/cli-lotus.md +++ b/documentation/en/cli-lotus.md @@ -63,6 +63,7 @@ OPTIONS: --import-chain value on first run, load chain from given file or url and validate --import-snapshot value import chain state from a given chain export file or url --halt-after-import halt the process after importing chain from file (default: false) + --lite start lotus in lite mode (default: false) --pprof value specify name of file for writing cpu profile to --profile value specify type of node --manage-fdlimit manage open file limit (default: true) @@ -1353,6 +1354,7 @@ USAGE: OPTIONS: --restart-retrievals restart stalled retrieval deals on this payment channel (default: true) + --reserve mark funds as reserved (default: false) --help, -h show help (default: false) ``` diff --git a/documentation/en/default-lotus-config.toml b/documentation/en/default-lotus-config.toml index 0bc22c9d772..ad917814b0d 100644 --- a/documentation/en/default-lotus-config.toml +++ b/documentation/en/default-lotus-config.toml @@ -121,6 +121,14 @@ # env var: LOTUS_CLIENT_SIMULTANEOUSTRANSFERSFORRETRIEVAL #SimultaneousTransfersForRetrieval = 20 + # Require that retrievals perform no on-chain operations. Paid retrievals + # without existing payment channels with available funds will fail instead + # of automatically performing on-chain operations. + # + # type: bool + # env var: LOTUS_CLIENT_OFFCHAINRETRIEVAL + #OffChainRetrieval = false + [Wallet] # type: string diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 897ba4f0611..28e0715591c 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -763,7 +763,7 @@ func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error { func (m *Manager) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (out storage.ReplicaUpdateOut, err error) { ctx, cancel := context.WithCancel(ctx) defer cancel() - log.Errorf("manager is doing replica update") + log.Debugf("manager is doing replica update") wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTReplicaUpdate, sector, pieces) if err != nil { return storage.ReplicaUpdateOut{}, xerrors.Errorf("getWork: %w", err) diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index 572d482edc8..2ca86f5465a 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -102,7 +102,13 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc, go func() { for _, call := range unfinished { - err := storiface.Err(storiface.ErrTempWorkerRestart, xerrors.New("worker restarted")) + hostname, osErr := os.Hostname() + if osErr != nil { + log.Errorf("get hostname err: %+v", err) + hostname = "" + } + + err := storiface.Err(storiface.ErrTempWorkerRestart, xerrors.Errorf("worker [Hostname: %s] restarted", hostname)) // TODO: Handle restarting PC1 once support is merged @@ -263,6 +269,15 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector storage.SectorRef, r } } + if err != nil { + hostname, osErr := os.Hostname() + if osErr != nil { + log.Errorf("get hostname err: %+v", err) + } + + err = xerrors.Errorf("%w [Hostname: %s]", err.Error(), hostname) + } + if doReturn(ctx, rt, ci, l.ret, res, toCallError(err)) { if err := l.ct.onReturned(ci); err != nil { log.Errorf("tracking call (done): %+v", err) diff --git a/extern/storage-sealing/states_replica_update.go b/extern/storage-sealing/states_replica_update.go index 8683a11d84d..bede7a5fa81 100644 --- a/extern/storage-sealing/states_replica_update.go +++ b/extern/storage-sealing/states_replica_update.go @@ -168,7 +168,7 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec log.Errorf("no good address to send replica update message from: %+v", err) return ctx.Send(SectorSubmitReplicaUpdateFailed{}) } - mcid, err := m.Api.SendMsg(ctx.Context(), from, m.maddr, miner.Methods.ProveReplicaUpdates, big.Zero(), big.Int(m.feeCfg.MaxCommitGasFee), enc.Bytes()) + mcid, err := m.Api.SendMsg(ctx.Context(), from, m.maddr, miner.Methods.ProveReplicaUpdates, collateral, big.Int(m.feeCfg.MaxCommitGasFee), enc.Bytes()) if err != nil { log.Errorf("handleSubmitReplicaUpdate: error sending message: %+v", err) return ctx.Send(SectorSubmitReplicaUpdateFailed{}) diff --git a/itests/paych_api_test.go b/itests/paych_api_test.go index 7e135a9bea7..19e18106415 100644 --- a/itests/paych_api_test.go +++ b/itests/paych_api_test.go @@ -65,7 +65,9 @@ func TestPaymentChannelsAPI(t *testing.T) { require.NoError(t, err) channelAmt := int64(7000) - channelInfo, err := paymentCreator.PaychGet(ctx, createrAddr, receiverAddr, abi.NewTokenAmount(channelAmt)) + channelInfo, err := paymentCreator.PaychGet(ctx, createrAddr, receiverAddr, abi.NewTokenAmount(channelAmt), api.PaychGetOpts{ + OffChain: false, + }) require.NoError(t, err) channel, err := paymentCreator.PaychGetWaitReady(ctx, channelInfo.WaitSentinel) diff --git a/itests/paych_cli_test.go b/itests/paych_cli_test.go index c3f9deeba8a..781be80a83b 100644 --- a/itests/paych_cli_test.go +++ b/itests/paych_cli_test.go @@ -144,10 +144,10 @@ func TestPaymentChannelStatus(t *testing.T) { require.True(t, stateCreating || stateCreated) channelAmtAtto := types.BigMul(types.NewInt(channelAmt), types.NewInt(build.FilecoinPrecision)) - channelAmtStr := fmt.Sprintf("%d", channelAmtAtto) + channelAmtStr := fmt.Sprintf("%s", types.FIL(channelAmtAtto)) if stateCreating { // If we're in the creating state (most likely) the amount should be pending - require.Regexp(t, regexp.MustCompile("Pending.*"+channelAmtStr), out) + require.Regexp(t, regexp.MustCompile("Pending Amt.*"+channelAmtStr), out) } // Wait for create channel to complete @@ -170,7 +170,7 @@ func TestPaymentChannelStatus(t *testing.T) { out = creatorCLI.RunCmd("paych", "status", chstr) fmt.Println(out) voucherAmtAtto := types.BigMul(types.NewInt(voucherAmt), types.NewInt(build.FilecoinPrecision)) - voucherAmtStr := fmt.Sprintf("%d", voucherAmtAtto) + voucherAmtStr := fmt.Sprintf("%s", types.FIL(voucherAmtAtto)) // Output should include voucher amount require.Regexp(t, regexp.MustCompile("Voucher.*"+voucherAmtStr), out) } diff --git a/markets/retrievaladapter/client.go b/markets/retrievaladapter/client.go index 1bef23e1296..74f02570b9e 100644 --- a/markets/retrievaladapter/client.go +++ b/markets/retrievaladapter/client.go @@ -10,6 +10,7 @@ import ( "github.com/ipfs/go-cid" "github.com/multiformats/go-multiaddr" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/paych" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/impl/full" @@ -17,6 +18,8 @@ import ( ) type retrievalClientNode struct { + forceOffChain bool + chainAPI full.ChainAPI payAPI payapi.PaychAPI stateAPI full.StateAPI @@ -24,8 +27,13 @@ type retrievalClientNode struct { // NewRetrievalClientNode returns a new node adapter for a retrieval client that talks to the // Lotus Node -func NewRetrievalClientNode(payAPI payapi.PaychAPI, chainAPI full.ChainAPI, stateAPI full.StateAPI) retrievalmarket.RetrievalClientNode { - return &retrievalClientNode{payAPI: payAPI, chainAPI: chainAPI, stateAPI: stateAPI} +func NewRetrievalClientNode(forceOffChain bool, payAPI payapi.PaychAPI, chainAPI full.ChainAPI, stateAPI full.StateAPI) retrievalmarket.RetrievalClientNode { + return &retrievalClientNode{ + forceOffChain: forceOffChain, + chainAPI: chainAPI, + payAPI: payAPI, + stateAPI: stateAPI, + } } // GetOrCreatePaymentChannel sets up a new payment channel if one does not exist @@ -34,10 +42,14 @@ func NewRetrievalClientNode(payAPI payapi.PaychAPI, chainAPI full.ChainAPI, stat func (rcn *retrievalClientNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable abi.TokenAmount, tok shared.TipSetToken) (address.Address, cid.Cid, error) { // TODO: respect the provided TipSetToken (a serialized TipSetKey) when // querying the chain - ci, err := rcn.payAPI.PaychGet(ctx, clientAddress, minerAddress, clientFundsAvailable) + ci, err := rcn.payAPI.PaychGet(ctx, clientAddress, minerAddress, clientFundsAvailable, api.PaychGetOpts{ + OffChain: rcn.forceOffChain, + }) if err != nil { + log.Errorw("paych get failed", "error", err) return address.Undef, cid.Undef, err } + return ci.Channel, ci.WaitSentinel, nil } diff --git a/node/builder_chain.go b/node/builder_chain.go index 1e568397ea6..afee868fd15 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -121,7 +121,7 @@ var ChainNode = Options( // Markets (retrieval) Override(new(discovery.PeerResolver), modules.RetrievalResolver), Override(new(retrievalmarket.BlockstoreAccessor), modules.RetrievalBlockstoreAccessor), - Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient), + Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient(false)), Override(new(dtypes.ClientDataTransfer), modules.NewClientGraphsyncDataTransfer), // Markets (storage) @@ -223,6 +223,8 @@ func ConfigFullNode(c interface{}) Option { ), Override(new(dtypes.Graphsync), modules.Graphsync(cfg.Client.SimultaneousTransfersForStorage, cfg.Client.SimultaneousTransfersForRetrieval)), + Override(new(retrievalmarket.RetrievalClient), modules.RetrievalClient(cfg.Client.OffChainRetrieval)), + If(cfg.Wallet.RemoteBackend != "", Override(new(*remotewallet.RemoteWallet), remotewallet.SetupRemoteWallet(cfg.Wallet.RemoteBackend)), ), diff --git a/node/config/def_test.go b/node/config/def_test.go index c25b255bfd3..a7a0e77cab1 100644 --- a/node/config/def_test.go +++ b/node/config/def_test.go @@ -55,5 +55,6 @@ func TestDefaultMinerRoundtrip(t *testing.T) { func TestDefaultStorageMiner_SetsIndexIngestTopic(t *testing.T) { subject := DefaultStorageMiner() - require.Equal(t, "/indexer/ingest/mainnet", subject.IndexProvider.PubSubTopic) + require.False(t, subject.IndexProvider.Enable) + require.Equal(t, "/indexer/ingest/mainnet", subject.IndexProvider.TopicName) } diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index c9b6357d39d..ccee363b49d 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -105,6 +105,14 @@ and storage providers for storage deals`, Comment: `The maximum number of simultaneous data transfers between the client and storage providers for retrieval deals`, }, + { + Name: "OffChainRetrieval", + Type: "bool", + + Comment: `Require that retrievals perform no on-chain operations. Paid retrievals +without existing payment channels with available funds will fail instead +of automatically performing on-chain operations.`, + }, }, "Common": []DocField{ { diff --git a/node/config/types.go b/node/config/types.go index c70d99a825a..55f924f2ae4 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -417,6 +417,11 @@ type Client struct { // The maximum number of simultaneous data transfers between the client // and storage providers for retrieval deals SimultaneousTransfersForRetrieval uint64 + + // Require that retrievals perform no on-chain operations. Paid retrievals + // without existing payment channels with available funds will fail instead + // of automatically performing on-chain operations. + OffChainRetrieval bool } type Wallet struct { diff --git a/node/impl/paych/paych.go b/node/impl/paych/paych.go index df3b1e3e490..d338c6032fd 100644 --- a/node/impl/paych/paych.go +++ b/node/impl/paych/paych.go @@ -22,8 +22,26 @@ type PaychAPI struct { PaychMgr *paychmgr.Manager } -func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) { - ch, mcid, err := a.PaychMgr.GetPaych(ctx, from, to, amt) +func (a *PaychAPI) PaychGet(ctx context.Context, from, to address.Address, amt types.BigInt, opts api.PaychGetOpts) (*api.ChannelInfo, error) { + ch, mcid, err := a.PaychMgr.GetPaych(ctx, from, to, amt, paychmgr.GetOpts{ + Reserve: true, + OffChain: opts.OffChain, + }) + if err != nil { + return nil, err + } + + return &api.ChannelInfo{ + Channel: ch, + WaitSentinel: mcid, + }, nil +} + +func (a *PaychAPI) PaychFund(ctx context.Context, from, to address.Address, amt types.BigInt) (*api.ChannelInfo, error) { + ch, mcid, err := a.PaychMgr.GetPaych(ctx, from, to, amt, paychmgr.GetOpts{ + Reserve: false, + OffChain: false, + }) if err != nil { return nil, err } @@ -55,7 +73,7 @@ func (a *PaychAPI) PaychNewPayment(ctx context.Context, from, to address.Address // TODO: Fix free fund tracking in PaychGet // TODO: validate voucher spec before locking funds - ch, err := a.PaychGet(ctx, from, to, amount) + ch, err := a.PaychGet(ctx, from, to, amount, api.PaychGetOpts{OffChain: false}) if err != nil { return nil, err } diff --git a/node/modules/client.go b/node/modules/client.go index 48f9dc3d778..1e74182041d 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -202,26 +202,28 @@ func StorageClient(lc fx.Lifecycle, h host.Host, dataTransfer dtypes.ClientDataT } // RetrievalClient creates a new retrieval client attached to the client blockstore -func RetrievalClient(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, +func RetrievalClient(forceOffChain bool) func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor retrievalmarket.BlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) { - - adapter := retrievaladapter.NewRetrievalClientNode(payAPI, chainAPI, stateAPI) - network := rmnet.NewFromLibp2pHost(h) - ds = namespace.Wrap(ds, datastore.NewKey("/retrievals/client")) - client, err := retrievalimpl.NewClient(network, dt, adapter, resolver, ds, accessor) - if err != nil { - return nil, err + return func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, + ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor retrievalmarket.BlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) { + adapter := retrievaladapter.NewRetrievalClientNode(forceOffChain, payAPI, chainAPI, stateAPI) + network := rmnet.NewFromLibp2pHost(h) + ds = namespace.Wrap(ds, datastore.NewKey("/retrievals/client")) + client, err := retrievalimpl.NewClient(network, dt, adapter, resolver, ds, accessor) + if err != nil { + return nil, err + } + client.OnReady(marketevents.ReadyLogger("retrieval client")) + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + client.SubscribeToEvents(marketevents.RetrievalClientLogger) + + evtType := j.RegisterEventType("markets/retrieval/client", "state_change") + client.SubscribeToEvents(markets.RetrievalClientJournaler(j, evtType)) + + return client.Start(ctx) + }, + }) + return client, nil } - client.OnReady(marketevents.ReadyLogger("retrieval client")) - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - client.SubscribeToEvents(marketevents.RetrievalClientLogger) - - evtType := j.RegisterEventType("markets/retrieval/client", "state_change") - client.SubscribeToEvents(markets.RetrievalClientJournaler(j, evtType)) - - return client.Start(ctx) - }, - }) - return client, nil } diff --git a/node/modules/storageminer_idxprov.go b/node/modules/storageminer_idxprov.go index c13fcb7c51a..23c718048ed 100644 --- a/node/modules/storageminer_idxprov.go +++ b/node/modules/storageminer_idxprov.go @@ -2,14 +2,14 @@ package modules import ( "context" - "github.com/ipfs/go-datastore" - "go.uber.org/fx" "github.com/filecoin-project/go-address" provider "github.com/filecoin-project/index-provider" "github.com/filecoin-project/index-provider/engine" + "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" "github.com/libp2p/go-libp2p-core/host" + "go.uber.org/fx" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/node/config" diff --git a/paychmgr/cbor_gen.go b/paychmgr/cbor_gen.go index caa4143a257..428c09a9e05 100644 --- a/paychmgr/cbor_gen.go +++ b/paychmgr/cbor_gen.go @@ -196,7 +196,7 @@ func (t *ChannelInfo) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{172}); err != nil { + if _, err := w.Write([]byte{174}); err != nil { return err } @@ -346,6 +346,38 @@ func (t *ChannelInfo) MarshalCBOR(w io.Writer) error { return err } + // t.AvailableAmount (big.Int) (struct) + if len("AvailableAmount") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"AvailableAmount\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("AvailableAmount"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("AvailableAmount")); err != nil { + return err + } + + if err := t.AvailableAmount.MarshalCBOR(w); err != nil { + return err + } + + // t.PendingAvailableAmount (big.Int) (struct) + if len("PendingAvailableAmount") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"PendingAvailableAmount\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("PendingAvailableAmount"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("PendingAvailableAmount")); err != nil { + return err + } + + if err := t.PendingAvailableAmount.MarshalCBOR(w); err != nil { + return err + } + // t.PendingAmount (big.Int) (struct) if len("PendingAmount") > cbg.MaxLength { return xerrors.Errorf("Value in field \"PendingAmount\" was too long") @@ -577,6 +609,26 @@ func (t *ChannelInfo) UnmarshalCBOR(r io.Reader) error { return xerrors.Errorf("unmarshaling t.Amount: %w", err) } + } + // t.AvailableAmount (big.Int) (struct) + case "AvailableAmount": + + { + + if err := t.AvailableAmount.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.AvailableAmount: %w", err) + } + + } + // t.PendingAvailableAmount (big.Int) (struct) + case "PendingAvailableAmount": + + { + + if err := t.PendingAvailableAmount.UnmarshalCBOR(br); err != nil { + return xerrors.Errorf("unmarshaling t.PendingAvailableAmount: %w", err) + } + } // t.PendingAmount (big.Int) (struct) case "PendingAmount": diff --git a/paychmgr/manager.go b/paychmgr/manager.go index e0fcd7a754c..ea77c67efbe 100644 --- a/paychmgr/manager.go +++ b/paychmgr/manager.go @@ -101,13 +101,22 @@ func (pm *Manager) Stop() error { return nil } -func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, amt types.BigInt) (address.Address, cid.Cid, error) { +type GetOpts struct { + Reserve bool + OffChain bool +} + +func (pm *Manager) GetPaych(ctx context.Context, from, to address.Address, amt types.BigInt, opts GetOpts) (address.Address, cid.Cid, error) { + if !opts.Reserve && opts.OffChain { + return address.Undef, cid.Undef, xerrors.Errorf("can't fund payment channels without on-chain operations") + } + chanAccessor, err := pm.accessorByFromTo(from, to) if err != nil { return address.Undef, cid.Undef, err } - return chanAccessor.getPaych(ctx, amt) + return chanAccessor.getPaych(ctx, amt, opts) } func (pm *Manager) AvailableFunds(ctx context.Context, ch address.Address) (*api.ChannelAvailableFunds, error) { @@ -142,6 +151,8 @@ func (pm *Manager) AvailableFundsByFromTo(ctx context.Context, from address.Addr To: to, ConfirmedAmt: types.NewInt(0), PendingAmt: types.NewInt(0), + NonReservedAmt: types.NewInt(0), + PendingAvailableAmt: types.NewInt(0), PendingWaitSentinel: nil, QueuedAmt: types.NewInt(0), VoucherReedeemedAmt: types.NewInt(0), diff --git a/paychmgr/paych.go b/paychmgr/paych.go index 16c6604c6ca..5fdb4d8842e 100644 --- a/paychmgr/paych.go +++ b/paychmgr/paych.go @@ -106,7 +106,7 @@ func (ca *channelAccessor) outboundActiveByFromTo(ctx context.Context, from, to ca.lk.Lock() defer ca.lk.Unlock() - return ca.store.OutboundActiveByFromTo(ctx, from, to) + return ca.store.OutboundActiveByFromTo(ctx, ca.api, from, to) } // createVoucher creates a voucher with the given specification, setting its diff --git a/paychmgr/paych_test.go b/paychmgr/paych_test.go index e798ba5854d..e485c4e83c8 100644 --- a/paychmgr/paych_test.go +++ b/paychmgr/paych_test.go @@ -475,18 +475,18 @@ func TestAddVoucherInboundWalletKey(t *testing.T) { toAcct := tutils.NewActorAddr(t, "toAct") // Create an actor for the channel in state - act := &types.Actor{ - Code: builtin.AccountActorCodeID, - Head: cid.Cid{}, - Nonce: 0, - Balance: types.NewInt(20), - } mock := newMockManagerAPI() mock.setAccountAddress(fromAcct, from) mock.setAccountAddress(toAcct, to) + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: types.NewInt(20), + } mock.setPaychState(ch, act, paychmock.NewMockPayChState(fromAcct, toAcct, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) // Create a manager diff --git a/paychmgr/paychget_test.go b/paychmgr/paychget_test.go index c9dc48b0519..0688301e883 100644 --- a/paychmgr/paychget_test.go +++ b/paychmgr/paychget_test.go @@ -25,6 +25,23 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) +var onChainReserve = GetOpts{ + Reserve: true, + OffChain: false, +} +var onChainNoReserve = GetOpts{ + Reserve: false, + OffChain: false, +} +var offChainReserve = GetOpts{ + Reserve: true, + OffChain: true, +} +var offChainNoReserve = GetOpts{ + Reserve: false, + OffChain: true, +} + func testChannelResponse(t *testing.T, ch address.Address) types.MessageReceipt { createChannelRet := init2.ExecReturn{ IDAddress: ch, @@ -55,7 +72,7 @@ func TestPaychGetCreateChannelMsg(t *testing.T) { require.NoError(t, err) amt := big.NewInt(10) - ch, mcid, err := mgr.GetPaych(ctx, from, to, amt) + ch, mcid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve) require.NoError(t, err) require.Equal(t, address.Undef, ch) @@ -65,6 +82,42 @@ func TestPaychGetCreateChannelMsg(t *testing.T) { require.Equal(t, amt, pushedMsg.Message.Value) } +func TestPaychGetOffchainNoReserveFails(t *testing.T) { + ctx := context.Background() + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + + from := tutils.NewIDAddr(t, 101) + to := tutils.NewIDAddr(t, 102) + + mock := newMockManagerAPI() + defer mock.close() + + mgr, err := newManager(store, mock) + require.NoError(t, err) + + amt := big.NewInt(10) + _, _, err = mgr.GetPaych(ctx, from, to, amt, offChainNoReserve) + require.Error(t, err) +} + +func TestPaychGetCreateOffchainReserveFails(t *testing.T) { + ctx := context.Background() + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + + from := tutils.NewIDAddr(t, 101) + to := tutils.NewIDAddr(t, 102) + + mock := newMockManagerAPI() + defer mock.close() + + mgr, err := newManager(store, mock) + require.NoError(t, err) + + amt := big.NewInt(10) + _, _, err = mgr.GetPaych(ctx, from, to, amt, offChainReserve) + require.Error(t, err) +} + // TestPaychGetCreateChannelThenAddFunds tests creating a channel and then // adding funds to it func TestPaychGetCreateChannelThenAddFunds(t *testing.T) { @@ -79,12 +132,20 @@ func TestPaychGetCreateChannelThenAddFunds(t *testing.T) { mock := newMockManagerAPI() defer mock.close() + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: types.NewInt(20), + } + mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) + mgr, err := newManager(store, mock) require.NoError(t, err) // Send create message for a channel with value 10 amt := big.NewInt(10) - _, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt) + _, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve) require.NoError(t, err) // Should have no channels yet (message sent but channel not created) @@ -101,7 +162,7 @@ func TestPaychGetCreateChannelThenAddFunds(t *testing.T) { // 2. Request add funds - should block until create channel has completed amt2 := big.NewInt(5) - ch2, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2) + ch2, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve) // 4. This GetPaych should return after create channel from first // GetPaych completes @@ -155,6 +216,82 @@ func TestPaychGetCreateChannelThenAddFunds(t *testing.T) { <-done } +func TestPaychGetCreatePrefundedChannelThenAddFunds(t *testing.T) { + ctx := context.Background() + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + + ch := tutils.NewIDAddr(t, 100) + from := tutils.NewIDAddr(t, 101) + to := tutils.NewIDAddr(t, 102) + + mock := newMockManagerAPI() + defer mock.close() + + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: types.NewInt(20), + } + mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) + + mgr, err := newManager(store, mock) + require.NoError(t, err) + + // Send create message for a channel with value 10 + amt := big.NewInt(10) + _, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt, onChainNoReserve) + require.NoError(t, err) + + // Should have no channels yet (message sent but channel not created) + cis, err := mgr.ListChannels(ctx) + require.NoError(t, err) + require.Len(t, cis, 0) + + // 1. Set up create channel response (sent in response to WaitForMsg()) + response := testChannelResponse(t, ch) + + done := make(chan struct{}) + go func() { + defer close(done) + + // 2. Request add funds - shouldn't block + amt2 := big.NewInt(3) + ch2, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2, offChainReserve) + + // 4. This GetPaych should return after create channel from first + // GetPaych completes + require.NoError(t, err) + + // Expect the channel to be the same + require.Equal(t, ch, ch2) + require.Equal(t, cid.Undef, addFundsMsgCid) + + // Should have one channel, whose address is the channel that was created + cis, err := mgr.ListChannels(ctx) + require.NoError(t, err) + require.Len(t, cis, 1) + require.Equal(t, ch, cis[0]) + + // Amount should be amount sent to first GetPaych (to create + // channel). + // PendingAmount should be zero, AvailableAmount should be Amount minus what we requested + + ci, err := mgr.GetChannelInfo(ctx, ch) + require.NoError(t, err) + require.EqualValues(t, 10, ci.Amount.Int64()) + require.EqualValues(t, 0, ci.PendingAmount.Int64()) + require.EqualValues(t, 7, ci.AvailableAmount.Int64()) + require.Nil(t, ci.CreateMsg) + require.Nil(t, ci.AddFundsMsg) + }() + + // 3. Send create channel response + mock.receiveMsgResponse(createMsgCid, response) + + <-done +} + // TestPaychGetCreateChannelWithErrorThenCreateAgain tests that if an // operation is queued up behind a create channel operation, and the create // channel fails, then the waiting operation can succeed. @@ -174,7 +311,7 @@ func TestPaychGetCreateChannelWithErrorThenCreateAgain(t *testing.T) { // Send create message for a channel amt := big.NewInt(10) - _, mcid1, err := mgr.GetPaych(ctx, from, to, amt) + _, mcid1, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve) require.NoError(t, err) // 1. Set up create channel response (sent in response to WaitForMsg()) @@ -192,7 +329,7 @@ func TestPaychGetCreateChannelWithErrorThenCreateAgain(t *testing.T) { // Because first channel create fails, this request // should be for channel create again. amt2 := big.NewInt(5) - ch2, mcid2, err := mgr.GetPaych(ctx, from, to, amt2) + ch2, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve) require.NoError(t, err) require.Equal(t, address.Undef, ch2) @@ -240,7 +377,7 @@ func TestPaychGetRecoverAfterError(t *testing.T) { // Send create message for a channel amt := big.NewInt(10) - _, mcid, err := mgr.GetPaych(ctx, from, to, amt) + _, mcid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve) require.NoError(t, err) // Send error create channel response @@ -251,7 +388,7 @@ func TestPaychGetRecoverAfterError(t *testing.T) { // Send create message for a channel again amt2 := big.NewInt(7) - _, mcid2, err := mgr.GetPaych(ctx, from, to, amt2) + _, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve) require.NoError(t, err) // Send success create channel response @@ -288,12 +425,20 @@ func TestPaychGetRecoverAfterAddFundsError(t *testing.T) { mock := newMockManagerAPI() defer mock.close() + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: types.NewInt(20), + } + mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) + mgr, err := newManager(store, mock) require.NoError(t, err) // Send create message for a channel amt := big.NewInt(10) - _, mcid1, err := mgr.GetPaych(ctx, from, to, amt) + _, mcid1, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve) require.NoError(t, err) // Send success create channel response @@ -302,7 +447,7 @@ func TestPaychGetRecoverAfterAddFundsError(t *testing.T) { // Send add funds message for channel amt2 := big.NewInt(5) - _, mcid2, err := mgr.GetPaych(ctx, from, to, amt2) + _, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve) require.NoError(t, err) // Send error add funds response @@ -329,7 +474,7 @@ func TestPaychGetRecoverAfterAddFundsError(t *testing.T) { // Send add funds message for channel again amt3 := big.NewInt(2) - _, mcid3, err := mgr.GetPaych(ctx, from, to, amt3) + _, mcid3, err := mgr.GetPaych(ctx, from, to, amt3, onChainReserve) require.NoError(t, err) // Send success add funds response @@ -375,7 +520,7 @@ func TestPaychGetRestartAfterCreateChannelMsg(t *testing.T) { // Send create message for a channel with value 10 amt := big.NewInt(10) - _, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt) + _, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve) require.NoError(t, err) // Simulate shutting down system @@ -385,6 +530,14 @@ func TestPaychGetRestartAfterCreateChannelMsg(t *testing.T) { mock2 := newMockManagerAPI() defer mock2.close() + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: types.NewInt(20), + } + mock2.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) + mgr2, err := newManager(store, mock2) require.NoError(t, err) @@ -402,7 +555,7 @@ func TestPaychGetRestartAfterCreateChannelMsg(t *testing.T) { // 2. Request add funds - should block until create channel has completed amt2 := big.NewInt(5) - ch2, addFundsMsgCid, err := mgr2.GetPaych(ctx, from, to, amt2) + ch2, addFundsMsgCid, err := mgr2.GetPaych(ctx, from, to, amt2, onChainReserve) // 4. This GetPaych should return after create channel from first // GetPaych completes @@ -450,12 +603,20 @@ func TestPaychGetRestartAfterAddFundsMsg(t *testing.T) { mock := newMockManagerAPI() + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: types.NewInt(20), + } + mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) + mgr, err := newManager(store, mock) require.NoError(t, err) // Send create message for a channel amt := big.NewInt(10) - _, mcid1, err := mgr.GetPaych(ctx, from, to, amt) + _, mcid1, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve) require.NoError(t, err) // Send success create channel response @@ -464,7 +625,7 @@ func TestPaychGetRestartAfterAddFundsMsg(t *testing.T) { // Send add funds message for channel amt2 := big.NewInt(5) - _, mcid2, err := mgr.GetPaych(ctx, from, to, amt2) + _, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve) require.NoError(t, err) // Simulate shutting down system @@ -474,6 +635,8 @@ func TestPaychGetRestartAfterAddFundsMsg(t *testing.T) { mock2 := newMockManagerAPI() defer mock2.close() + mock2.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) + mgr2, err := newManager(store, mock2) require.NoError(t, err) @@ -510,19 +673,27 @@ func TestPaychGetWait(t *testing.T) { from := tutils.NewIDAddr(t, 101) to := tutils.NewIDAddr(t, 102) + expch := tutils.NewIDAddr(t, 100) mock := newMockManagerAPI() defer mock.close() + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: types.NewInt(20), + } + mock.setPaychState(expch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) + mgr, err := newManager(store, mock) require.NoError(t, err) // 1. Get amt := big.NewInt(10) - _, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt) + _, createMsgCid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve) require.NoError(t, err) - expch := tutils.NewIDAddr(t, 100) go func() { // 3. Send response response := testChannelResponse(t, expch) @@ -542,7 +713,7 @@ func TestPaychGetWait(t *testing.T) { // Request add funds amt2 := big.NewInt(15) - _, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2) + _, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve) require.NoError(t, err) go func() { @@ -577,7 +748,7 @@ func TestPaychGetWaitErr(t *testing.T) { // 1. Create channel amt := big.NewInt(10) - _, mcid, err := mgr.GetPaych(ctx, from, to, amt) + _, mcid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve) require.NoError(t, err) done := make(chan address.Address) @@ -624,7 +795,7 @@ func TestPaychGetWaitCtx(t *testing.T) { require.NoError(t, err) amt := big.NewInt(10) - _, mcid, err := mgr.GetPaych(ctx, from, to, amt) + _, mcid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve) require.NoError(t, err) // When the context is cancelled, should unblock wait @@ -651,12 +822,20 @@ func TestPaychGetMergeAddFunds(t *testing.T) { mock := newMockManagerAPI() defer mock.close() + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: types.NewInt(20), + } + mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) + mgr, err := newManager(store, mock) require.NoError(t, err) // Send create message for a channel with value 10 createAmt := big.NewInt(10) - _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt) + _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve) require.NoError(t, err) // Queue up two add funds requests behind create channel @@ -674,7 +853,7 @@ func TestPaychGetMergeAddFunds(t *testing.T) { // Request add funds - should block until create channel has completed var err error - addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1) + addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, onChainReserve) require.NoError(t, err) }() @@ -683,7 +862,7 @@ func TestPaychGetMergeAddFunds(t *testing.T) { // Request add funds again - should merge with waiting add funds request var err error - addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2) + addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, onChainReserve) require.NoError(t, err) }() // Wait for add funds requests to be queued up @@ -736,10 +915,111 @@ func TestPaychGetMergeAddFunds(t *testing.T) { require.Equal(t, types.BigAdd(addFundsAmt1, addFundsAmt2), addFundsMsg.Message.Value) } -// TestPaychGetMergeAddFundsCtxCancelOne tests that when a queued add funds -// request is cancelled, its amount is removed from the total merged add funds -func TestPaychGetMergeAddFundsCtxCancelOne(t *testing.T) { - //stm: @TOKEN_PAYCH_WAIT_READY_001 +func TestPaychGetMergePrefundAndReserve(t *testing.T) { + ctx := context.Background() + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + + ch := tutils.NewIDAddr(t, 100) + from := tutils.NewIDAddr(t, 101) + to := tutils.NewIDAddr(t, 102) + + mock := newMockManagerAPI() + defer mock.close() + + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: types.NewInt(20), + } + mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) + + mgr, err := newManager(store, mock) + require.NoError(t, err) + + // Send create message for a channel with value 10 + createAmt := big.NewInt(10) + _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve) + require.NoError(t, err) + + // Queue up two add funds requests behind create channel + var addFundsSent sync.WaitGroup + addFundsSent.Add(2) + + addFundsAmt1 := big.NewInt(5) // 1 prefunds + addFundsAmt2 := big.NewInt(3) // 2 reserves + var addFundsCh1 address.Address + var addFundsCh2 address.Address + var addFundsMcid1 cid.Cid + var addFundsMcid2 cid.Cid + go func() { + defer addFundsSent.Done() + + // Request add funds - should block until create channel has completed + var err error + addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, onChainNoReserve) + require.NoError(t, err) + }() + + go func() { + defer addFundsSent.Done() + + // Request add funds again - should merge with waiting add funds request + var err error + addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, onChainReserve) + require.NoError(t, err) + }() + // Wait for add funds requests to be queued up + waitForQueueSize(t, mgr, from, to, 2) + + // Send create channel response + response := testChannelResponse(t, ch) + mock.receiveMsgResponse(createMsgCid, response) + + // Wait for create channel response + chres, err := mgr.GetPaychWaitReady(ctx, createMsgCid) + require.NoError(t, err) + require.Equal(t, ch, chres) + + // Wait for add funds requests to be sent + addFundsSent.Wait() + + // Expect add funds requests to have same channel as create channel and + // same message cid as each other (because they should have been merged) + require.Equal(t, ch, addFundsCh1) + require.Equal(t, ch, addFundsCh2) + require.Equal(t, addFundsMcid1, addFundsMcid2) + + // Send success add funds response + mock.receiveMsgResponse(addFundsMcid1, types.MessageReceipt{ + ExitCode: 0, + Return: []byte{}, + }) + + // Wait for add funds response + addFundsCh, err := mgr.GetPaychWaitReady(ctx, addFundsMcid1) + require.NoError(t, err) + require.Equal(t, ch, addFundsCh) + + // Make sure that one create channel message and one add funds message was + // sent + require.Equal(t, 2, mock.pushedMessageCount()) + + // Check create message amount is correct + createMsg := mock.pushedMessages(createMsgCid) + require.Equal(t, from, createMsg.Message.From) + require.Equal(t, lotusinit.Address, createMsg.Message.To) + require.Equal(t, createAmt, createMsg.Message.Value) + + // Check merged add funds amount is the sum of the individual + // amounts + addFundsMsg := mock.pushedMessages(addFundsMcid1) + require.Equal(t, from, addFundsMsg.Message.From) + require.Equal(t, ch, addFundsMsg.Message.To) + require.Equal(t, types.BigAdd(addFundsAmt1, addFundsAmt2), addFundsMsg.Message.Value) +} + +func TestPaychGetMergePrefundAndReservePrefunded(t *testing.T) { ctx := context.Background() store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) @@ -750,12 +1030,393 @@ func TestPaychGetMergeAddFundsCtxCancelOne(t *testing.T) { mock := newMockManagerAPI() defer mock.close() + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: types.NewInt(20), + } + mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) + + mgr, err := newManager(store, mock) + require.NoError(t, err) + + // Send create message for a channel with value 10 + createAmt := big.NewInt(10) + _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainNoReserve) + require.NoError(t, err) + + // Queue up two add funds requests behind create channel + var addFundsSent sync.WaitGroup + addFundsSent.Add(2) + + addFundsAmt1 := big.NewInt(5) // 1 prefunds + addFundsAmt2 := big.NewInt(3) // 2 reserves + var addFundsCh1 address.Address + var addFundsCh2 address.Address + var addFundsMcid1 cid.Cid + var addFundsMcid2 cid.Cid + go func() { + defer addFundsSent.Done() + + // Request add funds - should block until create channel has completed + var err error + addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, onChainNoReserve) + require.NoError(t, err) + }() + + go func() { + defer addFundsSent.Done() + + // Request add funds again - should merge with waiting add funds request + var err error + addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, onChainReserve) + require.NoError(t, err) + }() + // Wait for add funds requests to be queued up + waitForQueueSize(t, mgr, from, to, 2) + + // Send create channel response + response := testChannelResponse(t, ch) + mock.receiveMsgResponse(createMsgCid, response) + + // Wait for create channel response + chres, err := mgr.GetPaychWaitReady(ctx, createMsgCid) + require.NoError(t, err) + require.Equal(t, ch, chres) + + // Wait for add funds requests to be sent + addFundsSent.Wait() + + // Expect add funds requests to have same channel as create channel and + // same message cid as each other (because they should have been merged) + require.Equal(t, ch, addFundsCh1) + require.Equal(t, ch, addFundsCh2) + require.NotEqual(t, cid.Undef, addFundsMcid1) + require.Equal(t, cid.Undef, addFundsMcid2) + + // Send success add funds response + mock.receiveMsgResponse(addFundsMcid1, types.MessageReceipt{ + ExitCode: 0, + Return: []byte{}, + }) + + // Wait for add funds response + addFundsCh, err := mgr.GetPaychWaitReady(ctx, addFundsMcid1) + require.NoError(t, err) + require.Equal(t, ch, addFundsCh) + + // Make sure that one create channel message and one add funds message was + // sent + require.Equal(t, 2, mock.pushedMessageCount()) + + // Check create message amount is correct + createMsg := mock.pushedMessages(createMsgCid) + require.Equal(t, from, createMsg.Message.From) + require.Equal(t, lotusinit.Address, createMsg.Message.To) + require.Equal(t, createAmt, createMsg.Message.Value) + + // Check merged add funds amount is the sum of the individual + // amounts + addFundsMsg := mock.pushedMessages(addFundsMcid1) + require.Equal(t, from, addFundsMsg.Message.From) + require.Equal(t, ch, addFundsMsg.Message.To) + require.Equal(t, addFundsAmt1, addFundsMsg.Message.Value) +} + +func TestPaychGetMergePrefundAndReservePrefundedOneOffchain(t *testing.T) { + ctx := context.Background() + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + + ch := tutils.NewIDAddr(t, 100) + from := tutils.NewIDAddr(t, 101) + to := tutils.NewIDAddr(t, 102) + + mock := newMockManagerAPI() + defer mock.close() + + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: types.NewInt(20), + } + mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) + + mgr, err := newManager(store, mock) + require.NoError(t, err) + + // Send create message for a channel with value 10 + createAmt := big.NewInt(10) + _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainNoReserve) + require.NoError(t, err) + + // Queue up two add funds requests behind create channel + var addFundsSent sync.WaitGroup + addFundsSent.Add(2) + + addFundsAmt1 := big.NewInt(5) // 1 reserves + addFundsAmt2 := big.NewInt(3) // 2 reserves + var addFundsCh1 address.Address + var addFundsCh2 address.Address + var addFundsMcid1 cid.Cid + var addFundsMcid2 cid.Cid + go func() { + defer addFundsSent.Done() + + // Request add funds - should block until create channel has completed + var err error + addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, offChainReserve) + require.NoError(t, err) + }() + + go func() { + defer addFundsSent.Done() + + // Request add funds again - should merge with waiting add funds request + var err error + addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, onChainReserve) + require.NoError(t, err) + }() + // Wait for add funds requests to be queued up + waitForQueueSize(t, mgr, from, to, 2) + + // Send create channel response + response := testChannelResponse(t, ch) + mock.receiveMsgResponse(createMsgCid, response) + + // Wait for create channel response + chres, err := mgr.GetPaychWaitReady(ctx, createMsgCid) + require.NoError(t, err) + require.Equal(t, ch, chres) + + // Wait for add funds requests to be sent + addFundsSent.Wait() + + // Expect add funds requests to have same channel as create channel and + // same message cid as each other (because they should have been merged) + require.Equal(t, ch, addFundsCh1) + require.Equal(t, ch, addFundsCh2) + require.Equal(t, cid.Undef, addFundsMcid1) + require.Equal(t, cid.Undef, addFundsMcid2) + + // Make sure that one create channel message was sent + require.Equal(t, 1, mock.pushedMessageCount()) + + // Check create message amount is correct + createMsg := mock.pushedMessages(createMsgCid) + require.Equal(t, from, createMsg.Message.From) + require.Equal(t, lotusinit.Address, createMsg.Message.To) + require.Equal(t, createAmt, createMsg.Message.Value) +} + +func TestPaychGetMergePrefundAndReservePrefundedBothOffchainOneFail(t *testing.T) { + ctx := context.Background() + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + + ch := tutils.NewIDAddr(t, 100) + from := tutils.NewIDAddr(t, 101) + to := tutils.NewIDAddr(t, 102) + + mock := newMockManagerAPI() + defer mock.close() + + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: types.NewInt(20), + } + mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) + + mgr, err := newManager(store, mock) + require.NoError(t, err) + + // Send create message for a channel with value 10 + createAmt := big.NewInt(10) + _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainNoReserve) + require.NoError(t, err) + + // Queue up two add funds requests behind create channel + var addFundsSent sync.WaitGroup + addFundsSent.Add(2) + + addFundsAmt1 := big.NewInt(5) // 1 reserves + addFundsAmt2 := big.NewInt(6) // 2 reserves too much + var addFundsCh1 address.Address + var addFundsCh2 address.Address + var addFundsMcid1 cid.Cid + var addFundsMcid2 cid.Cid + go func() { + defer addFundsSent.Done() + + // Request add funds - should block until create channel has completed + var err error + addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, offChainReserve) + require.NoError(t, err) + }() + + go func() { + defer addFundsSent.Done() + + // Request add funds again - should merge with waiting add funds request + var err error + addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, offChainReserve) + require.Error(t, err) + }() + // Wait for add funds requests to be queued up + waitForQueueSize(t, mgr, from, to, 2) + + // Send create channel response + response := testChannelResponse(t, ch) + mock.receiveMsgResponse(createMsgCid, response) + + // Wait for create channel response + chres, err := mgr.GetPaychWaitReady(ctx, createMsgCid) + require.NoError(t, err) + require.Equal(t, ch, chres) + + // Wait for add funds requests to be sent + addFundsSent.Wait() + + // Expect add funds requests to have same channel as create channel and + // same message cid as each other (because they should have been merged) + require.Equal(t, ch, addFundsCh1) + require.Equal(t, ch, addFundsCh2) + require.Equal(t, cid.Undef, addFundsMcid1) + require.Equal(t, cid.Undef, addFundsMcid2) + + // Make sure that one create channel message was sent + require.Equal(t, 1, mock.pushedMessageCount()) + + // Check create message amount is correct + createMsg := mock.pushedMessages(createMsgCid) + require.Equal(t, from, createMsg.Message.From) + require.Equal(t, lotusinit.Address, createMsg.Message.To) + require.Equal(t, createAmt, createMsg.Message.Value) +} + +func TestPaychGetMergePrefundAndReserveOneOffchainOneFail(t *testing.T) { + ctx := context.Background() + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + + ch := tutils.NewIDAddr(t, 100) + from := tutils.NewIDAddr(t, 101) + to := tutils.NewIDAddr(t, 102) + + mock := newMockManagerAPI() + defer mock.close() + + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: types.NewInt(20), + } + mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) + + mgr, err := newManager(store, mock) + require.NoError(t, err) + + // Send create message for a channel with value 10 + createAmt := big.NewInt(10) + _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve) + require.NoError(t, err) + + // Queue up two add funds requests behind create channel + var addFundsSent sync.WaitGroup + addFundsSent.Add(2) + + addFundsAmt1 := big.NewInt(5) // 1 reserves + addFundsAmt2 := big.NewInt(6) // 2 reserves + var addFundsCh1 address.Address + var addFundsCh2 address.Address + var addFundsMcid1 cid.Cid + var addFundsMcid2 cid.Cid + go func() { + defer addFundsSent.Done() + + // Request add funds - should block until create channel has completed + var err error + addFundsCh1, addFundsMcid1, err = mgr.GetPaych(ctx, from, to, addFundsAmt1, onChainReserve) + require.NoError(t, err) + }() + + go func() { + defer addFundsSent.Done() + + // Request add funds again - should merge with waiting add funds request + var err error + addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, offChainReserve) + require.Error(t, err) + }() + // Wait for add funds requests to be queued up + waitForQueueSize(t, mgr, from, to, 2) + + // Send create channel response + response := testChannelResponse(t, ch) + mock.receiveMsgResponse(createMsgCid, response) + + // Wait for create channel response + chres, err := mgr.GetPaychWaitReady(ctx, createMsgCid) + require.NoError(t, err) + require.Equal(t, ch, chres) + + // Wait for add funds requests to be sent + addFundsSent.Wait() + + // Expect add funds requests to have same channel as create channel and + // same message cid as each other (because they should have been merged) + require.Equal(t, ch, addFundsCh1) + require.Equal(t, ch, addFundsCh2) + require.NotEqual(t, cid.Undef, addFundsMcid1) + require.Equal(t, cid.Undef, addFundsMcid2) + + // Make sure that one create channel message was sent + require.Equal(t, 2, mock.pushedMessageCount()) + + // Check create message amount is correct + createMsg := mock.pushedMessages(createMsgCid) + require.Equal(t, from, createMsg.Message.From) + require.Equal(t, lotusinit.Address, createMsg.Message.To) + require.Equal(t, createAmt, createMsg.Message.Value) + + // Check merged add funds amount is the sum of the individual + // amounts + addFundsMsg := mock.pushedMessages(addFundsMcid1) + require.Equal(t, from, addFundsMsg.Message.From) + require.Equal(t, ch, addFundsMsg.Message.To) + require.Equal(t, addFundsAmt1, addFundsMsg.Message.Value) +} + +// TestPaychGetMergeAddFundsCtxCancelOne tests that when a queued add funds +// request is cancelled, its amount is removed from the total merged add funds +func TestPaychGetMergeAddFundsCtxCancelOne(t *testing.T) { + //stm: @TOKEN_PAYCH_WAIT_READY_001 + ctx := context.Background() + store := NewStore(ds_sync.MutexWrap(ds.NewMapDatastore())) + + ch := tutils.NewIDAddr(t, 100) + from := tutils.NewIDAddr(t, 101) + to := tutils.NewIDAddr(t, 102) + + mock := newMockManagerAPI() + defer mock.close() + + act := &types.Actor{ + Code: builtin.AccountActorCodeID, + Head: cid.Cid{}, + Nonce: 0, + Balance: types.NewInt(20), + } + mock.setPaychState(ch, act, paychmock.NewMockPayChState(from, to, abi.ChainEpoch(0), make(map[uint64]paych.LaneState))) + mgr, err := newManager(store, mock) require.NoError(t, err) // Send create message for a channel with value 10 createAmt := big.NewInt(10) - _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt) + _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve) require.NoError(t, err) // Queue up two add funds requests behind create channel @@ -772,7 +1433,7 @@ func TestPaychGetMergeAddFundsCtxCancelOne(t *testing.T) { defer addFundsSent.Done() // Request add funds - should block until create channel has completed - _, _, addFundsErr1 = mgr.GetPaych(addFundsCtx1, from, to, addFundsAmt1) + _, _, addFundsErr1 = mgr.GetPaych(addFundsCtx1, from, to, addFundsAmt1, onChainReserve) }() go func() { @@ -780,7 +1441,7 @@ func TestPaychGetMergeAddFundsCtxCancelOne(t *testing.T) { // Request add funds again - should merge with waiting add funds request var err error - addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2) + addFundsCh2, addFundsMcid2, err = mgr.GetPaych(ctx, from, to, addFundsAmt2, onChainReserve) require.NoError(t, err) }() // Wait for add funds requests to be queued up @@ -853,7 +1514,7 @@ func TestPaychGetMergeAddFundsCtxCancelAll(t *testing.T) { // Send create message for a channel with value 10 createAmt := big.NewInt(10) - _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt) + _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve) require.NoError(t, err) // Queue up two add funds requests behind create channel @@ -868,14 +1529,14 @@ func TestPaychGetMergeAddFundsCtxCancelAll(t *testing.T) { defer addFundsSent.Done() // Request add funds - should block until create channel has completed - _, _, addFundsErr1 = mgr.GetPaych(addFundsCtx1, from, to, big.NewInt(5)) + _, _, addFundsErr1 = mgr.GetPaych(addFundsCtx1, from, to, big.NewInt(5), onChainReserve) }() go func() { defer addFundsSent.Done() // Request add funds again - should merge with waiting add funds request - _, _, addFundsErr2 = mgr.GetPaych(addFundsCtx2, from, to, big.NewInt(3)) + _, _, addFundsErr2 = mgr.GetPaych(addFundsCtx2, from, to, big.NewInt(3), onChainReserve) }() // Wait for add funds requests to be queued up waitForQueueSize(t, mgr, from, to, 2) @@ -941,7 +1602,7 @@ func TestPaychAvailableFunds(t *testing.T) { // Send create message for a channel with value 10 createAmt := big.NewInt(10) - _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt) + _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve) require.NoError(t, err) // Available funds should reflect create channel message sent @@ -966,7 +1627,7 @@ func TestPaychAvailableFunds(t *testing.T) { // Request add funds - should block until create channel has completed var err error - _, addFundsMcid, err = mgr.GetPaych(ctx, from, to, addFundsAmt) + _, addFundsMcid, err = mgr.GetPaych(ctx, from, to, addFundsAmt, onChainReserve) require.NoError(t, err) }() diff --git a/paychmgr/paychvoucherfunds_test.go b/paychmgr/paychvoucherfunds_test.go index f081ee606ff..dc894a04fd4 100644 --- a/paychmgr/paychvoucherfunds_test.go +++ b/paychmgr/paychvoucherfunds_test.go @@ -47,7 +47,7 @@ func TestPaychAddVoucherAfterAddFunds(t *testing.T) { // Send create message for a channel with value 10 createAmt := big.NewInt(10) - _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt) + _, createMsgCid, err := mgr.GetPaych(ctx, from, to, createAmt, onChainReserve) require.NoError(t, err) // Send create channel response @@ -83,7 +83,7 @@ func TestPaychAddVoucherAfterAddFunds(t *testing.T) { require.Equal(t, res.Shortfall, excessAmt) // Add funds so as to cover the voucher shortfall - _, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, excessAmt) + _, addFundsMsgCid, err := mgr.GetPaych(ctx, from, to, excessAmt, onChainReserve) require.NoError(t, err) // Trigger add funds confirmation diff --git a/paychmgr/settle_test.go b/paychmgr/settle_test.go index 4d2393e96bf..ffbffc66029 100644 --- a/paychmgr/settle_test.go +++ b/paychmgr/settle_test.go @@ -30,7 +30,7 @@ func TestPaychSettle(t *testing.T) { require.NoError(t, err) amt := big.NewInt(10) - _, mcid, err := mgr.GetPaych(ctx, from, to, amt) + _, mcid, err := mgr.GetPaych(ctx, from, to, amt, onChainReserve) require.NoError(t, err) // Send channel create response @@ -50,7 +50,7 @@ func TestPaychSettle(t *testing.T) { // (should create a new channel because the previous channel // is settling) amt2 := big.NewInt(5) - _, mcid2, err := mgr.GetPaych(ctx, from, to, amt2) + _, mcid2, err := mgr.GetPaych(ctx, from, to, amt2, onChainReserve) require.NoError(t, err) require.NotEqual(t, cid.Undef, mcid2) diff --git a/paychmgr/simple.go b/paychmgr/simple.go index 502338e2953..3d0992efec7 100644 --- a/paychmgr/simple.go +++ b/paychmgr/simple.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "sort" "sync" "github.com/ipfs/go-cid" @@ -32,18 +33,20 @@ type fundsReq struct { ctx context.Context promise chan *paychFundsRes amt types.BigInt + opts GetOpts lk sync.Mutex // merge parent, if this req is part of a merge merge *mergedFundsReq } -func newFundsReq(ctx context.Context, amt types.BigInt) *fundsReq { - promise := make(chan *paychFundsRes) +func newFundsReq(ctx context.Context, amt types.BigInt, opts GetOpts) *fundsReq { + promise := make(chan *paychFundsRes, 1) return &fundsReq{ ctx: ctx, promise: promise, amt: amt, + opts: opts, } } @@ -104,6 +107,19 @@ func newMergedFundsReq(reqs []*fundsReq) *mergedFundsReq { r.setMergeParent(m) } + sort.Slice(m.reqs, func(i, j int) bool { + if m.reqs[i].opts.OffChain != m.reqs[j].opts.OffChain { // off-chain first + return m.reqs[i].opts.OffChain + } + + if m.reqs[i].opts.Reserve != m.reqs[j].opts.Reserve { // non-reserve after off-chain + return m.reqs[i].opts.Reserve + } + + // sort by amount asc (reducing latency for smaller requests) + return m.reqs[i].amt.LessThan(m.reqs[j].amt) + }) + // If the requests were all cancelled while being added, cancel the context // immediately m.checkActive() @@ -135,18 +151,97 @@ func (m *mergedFundsReq) onComplete(res *paychFundsRes) { } // sum is the sum of the amounts in all requests in the merge -func (m *mergedFundsReq) sum() types.BigInt { +func (m *mergedFundsReq) sum() (types.BigInt, types.BigInt) { sum := types.NewInt(0) + avail := types.NewInt(0) + for _, r := range m.reqs { if r.isActive() { sum = types.BigAdd(sum, r.amt) + if !r.opts.Reserve { + avail = types.BigAdd(avail, r.amt) + } + } + } + + return sum, avail +} + +// completeAmount completes first non-reserving requests up to the available amount +func (m *mergedFundsReq) completeAmount(avail types.BigInt, channelInfo *ChannelInfo) (*paychFundsRes, types.BigInt, types.BigInt) { + used, failed := types.NewInt(0), types.NewInt(0) + next := 0 + + // order: [offchain+reserve, !offchain+reserve, !offchain+!reserve] + for i, r := range m.reqs { + if !r.opts.Reserve { + // non-reserving request are put after reserving requests, so we are done here + break + } + + // don't try to fill inactive requests + if !r.isActive() { + continue + } + + if r.amt.GreaterThan(types.BigSub(avail, used)) { + // requests are sorted by amount ascending, so if we hit this, there aren't any more requests we can fill + + if r.opts.OffChain { + // can't fill, so OffChain want an error + if r.isActive() { + failed = types.BigAdd(failed, r.amt) + r.onComplete(&paychFundsRes{ + channel: *channelInfo.Channel, + err: xerrors.Errorf("not enough funds available in the payment channel %s; add funds with 'lotus paych add-funds %s %s %s'", channelInfo.Channel, channelInfo.from(), channelInfo.to(), types.FIL(r.amt).Unitless()), + }) + } + next = i + 1 + continue + } + + break + } + + used = types.BigAdd(used, r.amt) + r.onComplete(&paychFundsRes{channel: *channelInfo.Channel}) + next = i + 1 + } + + m.reqs = m.reqs[next:] + if len(m.reqs) == 0 { + return &paychFundsRes{channel: *channelInfo.Channel}, used, failed + } + return nil, used, failed +} + +func (m *mergedFundsReq) failOffChainNoChannel(from, to address.Address) (*paychFundsRes, types.BigInt) { + next := 0 + freed := types.NewInt(0) + + for i, r := range m.reqs { + if !r.opts.OffChain { + break } + + freed = types.BigAdd(freed, r.amt) + if !r.isActive() { + continue + } + r.onComplete(&paychFundsRes{err: xerrors.Errorf("payment channel doesn't exist, create with 'lotus paych add-funds %s %s %s'", from, to, types.FIL(r.amt).Unitless())}) + next = i + 1 + } + + m.reqs = m.reqs[next:] + if len(m.reqs) == 0 { + return &paychFundsRes{err: xerrors.Errorf("payment channel doesn't exist, create with 'lotus paych add-funds %s %s 0'", from, to)}, freed } - return sum + + return nil, freed } // getPaych ensures that a channel exists between the from and to addresses, -// and adds the given amount of funds. +// and reserves (or adds as available) the given amount of funds. // If the channel does not exist a create channel message is sent and the // message CID is returned. // If the channel does exist an add funds message is sent and both the channel @@ -156,9 +251,9 @@ func (m *mergedFundsReq) sum() types.BigInt { // address and the CID of the new add funds message. // If an operation returns an error, subsequent waiting operations will still // be attempted. -func (ca *channelAccessor) getPaych(ctx context.Context, amt types.BigInt) (address.Address, cid.Cid, error) { +func (ca *channelAccessor) getPaych(ctx context.Context, amt types.BigInt, opts GetOpts) (address.Address, cid.Cid, error) { // Add the request to add funds to a queue and wait for the result - freq := newFundsReq(ctx, amt) + freq := newFundsReq(ctx, amt, opts) ca.enqueue(ctx, freq) select { case res := <-freq.promise: @@ -195,14 +290,14 @@ func (ca *channelAccessor) processQueue(ctx context.Context, channelID string) ( // For example if there are pending requests for 3, 2, 4 then // amt = 3 + 2 + 4 = 9 merged := newMergedFundsReq(ca.fundsReqQueue) - amt := merged.sum() + amt, avail := merged.sum() if amt.IsZero() { // Note: The amount can be zero if requests are cancelled as we're // building the mergedFundsReq return ca.currentAvailableFunds(ctx, channelID, amt) } - res := ca.processTask(merged.ctx, amt) + res := ca.processTask(merged, amt, avail) // If the task is waiting on an external event (eg something to appear on // chain) it will return nil @@ -322,6 +417,8 @@ func (ca *channelAccessor) currentAvailableFunds(ctx context.Context, channelID To: channelInfo.to(), ConfirmedAmt: channelInfo.Amount, PendingAmt: channelInfo.PendingAmount, + NonReservedAmt: channelInfo.AvailableAmount, + PendingAvailableAmt: channelInfo.PendingAvailableAmount, PendingWaitSentinel: waitSentinel, QueuedAmt: queuedAmt, VoucherReedeemedAmt: totalRedeemed, @@ -333,18 +430,26 @@ func (ca *channelAccessor) currentAvailableFunds(ctx context.Context, channelID // Note that processTask may be called repeatedly in the same state, and should // return nil if there is no state change to be made (eg when waiting for a // message to be confirmed on chain) -func (ca *channelAccessor) processTask(ctx context.Context, amt types.BigInt) *paychFundsRes { +func (ca *channelAccessor) processTask(merged *mergedFundsReq, amt, avail types.BigInt) *paychFundsRes { + ctx := merged.ctx + // Get the payment channel for the from/to addresses. // Note: It's ok if we get ErrChannelNotTracked. It just means we need to // create a channel. - channelInfo, err := ca.store.OutboundActiveByFromTo(ctx, ca.from, ca.to) + channelInfo, err := ca.store.OutboundActiveByFromTo(ctx, ca.api, ca.from, ca.to) if err != nil && err != ErrChannelNotTracked { return &paychFundsRes{err: err} } // If a channel has not yet been created, create one. if channelInfo == nil { - mcid, err := ca.createPaych(ctx, amt) + res, freed := merged.failOffChainNoChannel(ca.from, ca.to) + if res != nil { + return res + } + amt = types.BigSub(amt, freed) + + mcid, err := ca.createPaych(ctx, amt, avail) if err != nil { return &paychFundsRes{err: err} } @@ -366,9 +471,16 @@ func (ca *channelAccessor) processTask(ctx context.Context, amt types.BigInt) *p return nil } + // Try to fill requests using available funds, without going to the chain + res, amt := ca.completeAvailable(ctx, merged, channelInfo, amt, avail) + + if res != nil || amt.LessThanEqual(types.NewInt(0)) { + return res + } + // We need to add more funds, so send an add funds message to // cover the amount for this request - mcid, err := ca.addFunds(ctx, channelInfo, amt) + mcid, err := ca.addFunds(ctx, channelInfo, amt, avail) if err != nil { return &paychFundsRes{err: err} } @@ -376,7 +488,7 @@ func (ca *channelAccessor) processTask(ctx context.Context, amt types.BigInt) *p } // createPaych sends a message to create the channel and returns the message cid -func (ca *channelAccessor) createPaych(ctx context.Context, amt types.BigInt) (cid.Cid, error) { +func (ca *channelAccessor) createPaych(ctx context.Context, amt, avail types.BigInt) (cid.Cid, error) { mb, err := ca.messageBuilder(ctx, ca.from) if err != nil { return cid.Undef, err @@ -393,7 +505,7 @@ func (ca *channelAccessor) createPaych(ctx context.Context, amt types.BigInt) (c mcid := smsg.Cid() // Create a new channel in the store - ci, err := ca.store.CreateChannel(ctx, ca.from, ca.to, mcid, amt) + ci, err := ca.store.CreateChannel(ctx, ca.from, ca.to, mcid, amt, avail) if err != nil { log.Errorf("creating channel: %s", err) return cid.Undef, err @@ -452,15 +564,41 @@ func (ca *channelAccessor) waitPaychCreateMsg(ctx context.Context, channelID str ca.mutateChannelInfo(ctx, channelID, func(channelInfo *ChannelInfo) { channelInfo.Channel = &decodedReturn.RobustAddress channelInfo.Amount = channelInfo.PendingAmount + channelInfo.AvailableAmount = channelInfo.PendingAvailableAmount channelInfo.PendingAmount = big.NewInt(0) + channelInfo.PendingAvailableAmount = big.NewInt(0) channelInfo.CreateMsg = nil }) return nil } +// completeAvailable fills reserving fund requests using already available funds, without interacting with the chain +func (ca *channelAccessor) completeAvailable(ctx context.Context, merged *mergedFundsReq, channelInfo *ChannelInfo, amt, av types.BigInt) (*paychFundsRes, types.BigInt) { + toReserve := types.BigSub(amt, av) + avail := types.NewInt(0) + + // reserve at most what we need + ca.mutateChannelInfo(ctx, channelInfo.ChannelID, func(ci *ChannelInfo) { + avail = ci.AvailableAmount + if avail.GreaterThan(toReserve) { + avail = toReserve + } + ci.AvailableAmount = big.Sub(ci.AvailableAmount, avail) + }) + + res, used, failed := merged.completeAmount(avail, channelInfo) + + // return any unused reserved funds (e.g. from cancelled requests) + ca.mutateChannelInfo(ctx, channelInfo.ChannelID, func(ci *ChannelInfo) { + ci.AvailableAmount = types.BigAdd(ci.AvailableAmount, types.BigSub(avail, used)) + }) + + return res, types.BigSub(amt, types.BigAdd(used, failed)) +} + // addFunds sends a message to add funds to the channel and returns the message cid -func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInfo, amt types.BigInt) (*cid.Cid, error) { +func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInfo, amt, avail types.BigInt) (*cid.Cid, error) { msg := &types.Message{ To: *channelInfo.Channel, From: channelInfo.Control, @@ -477,6 +615,7 @@ func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInf // Store the add funds message CID on the channel ca.mutateChannelInfo(ctx, channelInfo.ChannelID, func(ci *ChannelInfo) { ci.PendingAmount = amt + ci.PendingAvailableAmount = avail ci.AddFundsMsg = &mcid }) @@ -492,6 +631,8 @@ func (ca *channelAccessor) addFunds(ctx context.Context, channelInfo *ChannelInf return &mcid, nil } +// TODO func (ca *channelAccessor) freeFunds(ctx context.Context, channelInfo *ChannelInfo, amt, avail types.BigInt) (*cid.Cid, error) { + // waitForAddFundsMsg waits for mcid to appear on chain and returns error, if any func (ca *channelAccessor) waitForAddFundsMsg(ctx context.Context, channelID string, mcid cid.Cid) { err := ca.waitAddFundsMsg(ctx, channelID, mcid) @@ -514,6 +655,7 @@ func (ca *channelAccessor) waitAddFundsMsg(ctx context.Context, channelID string ca.mutateChannelInfo(ctx, channelID, func(channelInfo *ChannelInfo) { channelInfo.PendingAmount = big.NewInt(0) + channelInfo.PendingAvailableAmount = big.NewInt(0) channelInfo.AddFundsMsg = nil }) @@ -526,7 +668,9 @@ func (ca *channelAccessor) waitAddFundsMsg(ctx context.Context, channelID string // Store updated amount ca.mutateChannelInfo(ctx, channelID, func(channelInfo *ChannelInfo) { channelInfo.Amount = types.BigAdd(channelInfo.Amount, channelInfo.PendingAmount) + channelInfo.AvailableAmount = types.BigAdd(channelInfo.AvailableAmount, channelInfo.PendingAvailableAmount) channelInfo.PendingAmount = big.NewInt(0) + channelInfo.PendingAvailableAmount = big.NewInt(0) channelInfo.AddFundsMsg = nil }) diff --git a/paychmgr/store.go b/paychmgr/store.go index 62849e6bedd..f3c67e5650b 100644 --- a/paychmgr/store.go +++ b/paychmgr/store.go @@ -6,9 +6,8 @@ import ( "errors" "fmt" - "golang.org/x/xerrors" - "github.com/google/uuid" + "golang.org/x/xerrors" "github.com/filecoin-project/lotus/chain/types" @@ -74,6 +73,10 @@ type ChannelInfo struct { // has locally been added to the channel. It should reflect the channel's // Balance on chain as long as all operations occur on the same datastore. Amount types.BigInt + // AvailableAmount indicates how much afil is non-reserved + AvailableAmount types.BigInt + // PendingAvailableAmount is available amount that we're awaiting confirmation of + PendingAvailableAmount types.BigInt // PendingAmount is the amount that we're awaiting confirmation of PendingAmount types.BigInt // CreateMsg is the CID of a pending create message (while waiting for confirmation) @@ -376,7 +379,7 @@ func (ps *Store) GetMessage(ctx context.Context, mcid cid.Cid) (*MsgInfo, error) // OutboundActiveByFromTo looks for outbound channels that have not been // settled, with the given from / to addresses -func (ps *Store) OutboundActiveByFromTo(ctx context.Context, from address.Address, to address.Address) (*ChannelInfo, error) { +func (ps *Store) OutboundActiveByFromTo(ctx context.Context, sma stateManagerAPI, from address.Address, to address.Address) (*ChannelInfo, error) { return ps.findChan(ctx, func(ci *ChannelInfo) bool { if ci.Direction != DirOutbound { return false @@ -384,6 +387,21 @@ func (ps *Store) OutboundActiveByFromTo(ctx context.Context, from address.Addres if ci.Settling { return false } + + if ci.Channel != nil { + _, st, err := sma.GetPaychState(ctx, *ci.Channel, nil) + if err != nil { + return false + } + sat, err := st.SettlingAt() + if err != nil { + return false + } + if sat != 0 { + return false + } + } + return ci.Control == from && ci.Target == to }) } @@ -416,14 +434,15 @@ func (ps *Store) ByChannelID(ctx context.Context, channelID string) (*ChannelInf } // CreateChannel creates an outbound channel for the given from / to -func (ps *Store) CreateChannel(ctx context.Context, from address.Address, to address.Address, createMsgCid cid.Cid, amt types.BigInt) (*ChannelInfo, error) { +func (ps *Store) CreateChannel(ctx context.Context, from address.Address, to address.Address, createMsgCid cid.Cid, amt, avail types.BigInt) (*ChannelInfo, error) { ci := &ChannelInfo{ - Direction: DirOutbound, - NextLane: 0, - Control: from, - Target: to, - CreateMsg: &createMsgCid, - PendingAmount: amt, + Direction: DirOutbound, + NextLane: 0, + Control: from, + Target: to, + CreateMsg: &createMsgCid, + PendingAmount: amt, + PendingAvailableAmount: avail, } // Save the new channel @@ -497,5 +516,11 @@ func unmarshallChannelInfo(stored *ChannelInfo, value []byte) (*ChannelInfo, err stored.Channel = nil } + // backwards compat + if stored.AvailableAmount.Int == nil { + stored.AvailableAmount = types.NewInt(0) + stored.PendingAvailableAmount = types.NewInt(0) + } + return stored, nil } diff --git a/scripts/snap-lotus-entrypoint.sh b/scripts/snap-lotus-entrypoint.sh new file mode 100755 index 00000000000..a3ab04c5b34 --- /dev/null +++ b/scripts/snap-lotus-entrypoint.sh @@ -0,0 +1,10 @@ +LOTUS_IMPORT_SNAPSHOT="https://fil-chain-snapshots-fallback.s3.amazonaws.com/mainnet/minimal_finality_stateroots_latest.car" +LOTUS_BINARY=$(dirname "$0")/lotus +GATE="$LOTUS_PATH"/date_initialized +if [ ! -f "$GATE" ]; then + echo importing minimal snapshot + $LOTUS_BINARY daemon --import-snapshot "$LOTUS_IMPORT_SNAPSHOT" --halt-after-import + # Block future inits + date > "$GATE" +fi +$LOTUS_BINARY daemon $ARGS diff --git a/snap/snapcraft.yaml b/snap/snapcraft.yaml index 25b84058d0a..08854555ffc 100644 --- a/snap/snapcraft.yaml +++ b/snap/snapcraft.yaml @@ -38,6 +38,15 @@ parts: override-build: | LDFLAGS="" make lotus lotus-miner lotus-worker cp lotus lotus-miner lotus-worker $SNAPCRAFT_PART_INSTALL + cp scripts/snap-lotus-entrypoint.sh $SNAPCRAFT_PART_INSTALL + +layout: + /var/lib/lotus: + symlink: $SNAP_COMMON/lotus + /var/lib/lotus-miner: + symlink: $SNAP_COMMON/lotus-miner + /var/lib/lotus-worker: + symlink: $SNAP_COMMON/lotus-worker apps: lotus: @@ -48,9 +57,9 @@ apps: - home environment: FIL_PROOFS_PARAMETER_CACHE: $SNAP_USER_COMMON/filecoin-proof-parameters - LOTUS_PATH: $SNAP_USER_COMMON/lotus - LOTUS_MINER_PATH: $SNAP_USER_COMMON/lotus-miner - LOTUS_WORKER_PATH: $SNAP_USER_COMMON/lotus-worker + LOTUS_PATH: $SNAP_COMMON/lotus + LOTUS_MINER_PATH: $SNAP_COMMON/lotus-miner + LOTUS_WORKER_PATH: $SNAP_COMMON/lotus-worker lotus-miner: command: lotus-miner plugs: @@ -59,9 +68,9 @@ apps: - opengl environment: FIL_PROOFS_PARAMETER_CACHE: $SNAP_USER_COMMON/filecoin-proof-parameters - LOTUS_PATH: $SNAP_USER_COMMON/lotus - LOTUS_MINER_PATH: $SNAP_USER_COMMON/lotus-miner - LOTUS_WORKER_PATH: $SNAP_USER_COMMON/lotus-worker + LOTUS_PATH: $SNAP_COMMON/lotus + LOTUS_MINER_PATH: $SNAP_COMMON/lotus-miner + LOTUS_WORKER_PATH: $SNAP_COMMON/lotus-worker lotus-worker: command: lotus-worker plugs: @@ -70,6 +79,18 @@ apps: - opengl environment: FIL_PROOFS_PARAMETER_CACHE: $SNAP_USER_COMMON/filecoin-proof-parameters - LOTUS_PATH: $SNAP_USER_COMMON/lotus - LOTUS_MINER_PATH: $SNAP_USER_COMMON/lotus-miner - LOTUS_WORKER_PATH: $SNAP_USER_COMMON/lotus-worker + LOTUS_PATH: $SNAP_COMMON/lotus + LOTUS_MINER_PATH: $SNAP_COMMON/lotus-miner + LOTUS_WORKER_PATH: $SNAP_COMMON/lotus-worker + lotus-daemon: + command: snap-lotus-entrypoint.sh + daemon: simple + install-mode: enable + plugs: + - network + - network-bind + environment: + FIL_PROOFS_PARAMETER_CACHE: $SNAP_COMMON/filecoin-proof-parameters + LOTUS_PATH: $SNAP_COMMON/lotus + LOTUS_MINER_PATH: $SNAP_COMMON/lotus-miner + LOTUS_WORKER_PATH: $SNAP_COMMON/lotus-worker diff --git a/testplans/lotus-soup/deals_e2e.go b/testplans/lotus-soup/deals_e2e.go index 6737bdae226..44eec2d7ab1 100644 --- a/testplans/lotus-soup/deals_e2e.go +++ b/testplans/lotus-soup/deals_e2e.go @@ -207,7 +207,9 @@ func initPaymentChannel(t *testkit.TestEnvironment, ctx context.Context, cl *tes t.RecordMessage("my balance: %d", balance) t.RecordMessage("creating payment channel; from=%s, to=%s, funds=%d", cl.Wallet.Address, recv.WalletAddr, balance) - channel, err := cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, balance) + channel, err := cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, balance, api.PaychGetOpts{ + OffChain: false, + }) if err != nil { return fmt.Errorf("failed to create payment channel: %w", err) } @@ -230,7 +232,9 @@ func initPaymentChannel(t *testkit.TestEnvironment, ctx context.Context, cl *tes // we wait for 2 confirmations, so we have the assurance the channel is tracked. t.RecordMessage("reloading paych; now it should have an address") - channel, err = cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, big.Zero()) + channel, err = cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, big.Zero(), api.PaychGetOpts{ + OffChain: false, + }) if err != nil { return fmt.Errorf("failed to reload payment channel: %w", err) } diff --git a/testplans/lotus-soup/go.mod b/testplans/lotus-soup/go.mod index c484f729066..079ad814ebc 100644 --- a/testplans/lotus-soup/go.mod +++ b/testplans/lotus-soup/go.mod @@ -9,7 +9,7 @@ require ( github.com/drand/drand v1.3.0 github.com/filecoin-project/go-address v0.0.6 github.com/filecoin-project/go-data-transfer v1.14.0 - github.com/filecoin-project/go-fil-markets v1.19.2-0.20220223141410-2460e15e07a8 + github.com/filecoin-project/go-fil-markets v1.20.0 github.com/filecoin-project/go-jsonrpc v0.1.5 github.com/filecoin-project/go-state-types v0.1.3 github.com/filecoin-project/go-storedcounter v0.1.0 diff --git a/testplans/lotus-soup/go.sum b/testplans/lotus-soup/go.sum index 525865ea9c4..cb0a1b762d1 100644 --- a/testplans/lotus-soup/go.sum +++ b/testplans/lotus-soup/go.sum @@ -428,6 +428,8 @@ github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7L github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8= github.com/filecoin-project/go-fil-markets v1.19.2-0.20220223141410-2460e15e07a8 h1:cuV4t78W8tUgmqvsc6T6qbWuycg7GzPcNsTdAOxrTns= github.com/filecoin-project/go-fil-markets v1.19.2-0.20220223141410-2460e15e07a8/go.mod h1:OeR49x+NPMfZMMgOresxzecMAgUB+lna2kwJABPwnt8= +github.com/filecoin-project/go-fil-markets v1.20.0 h1:kP9A2otcRe4mTfL++GF0+d4PoFDI92E38lhM8GXFT9I= +github.com/filecoin-project/go-fil-markets v1.20.0/go.mod h1:OeR49x+NPMfZMMgOresxzecMAgUB+lna2kwJABPwnt8= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM= diff --git a/testplans/lotus-soup/paych/stress.go b/testplans/lotus-soup/paych/stress.go index 85246603f0f..4f107bf5b67 100644 --- a/testplans/lotus-soup/paych/stress.go +++ b/testplans/lotus-soup/paych/stress.go @@ -124,7 +124,9 @@ func runSender(ctx context.Context, t *testkit.TestEnvironment, clients []*testk time.Sleep(20 * time.Second) - channel, err := cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, channelAmt) + channel, err := cl.FullApi.PaychGet(ctx, cl.Wallet.Address, recv.WalletAddr, channelAmt, api.PaychGetOpts{ + OffChain: false, + }) if err != nil { return fmt.Errorf("failed to create payment channel: %w", err) } diff --git a/tools/packer/lotus-snap.pkr.hcl b/tools/packer/lotus-snap.pkr.hcl new file mode 100644 index 00000000000..8ceb743d088 --- /dev/null +++ b/tools/packer/lotus-snap.pkr.hcl @@ -0,0 +1,84 @@ +variable "ci_workspace_bins" { + type = string + default = "./linux" +} + +variable "lotus_network" { + type = string + default = "mainnet" +} + +locals { + timestamp = regex_replace(timestamp(), "[- TZ:]", "") +} + +source "amazon-ebs" "lotus" { + ami_name = "lotus-${var.lotus_network}-snap-${local.timestamp}" + ami_regions = [ + "ap-east-1", + "ap-northeast-1", + "ap-northeast-2", + "ap-northeast-3", + "ap-south-1", + "ap-southeast-1", + "ap-southeast-2", + "ca-central-1", + "eu-central-1", + "eu-north-1", + "eu-west-1", + "eu-west-2", + "eu-west-3", + "sa-east-1", + "us-east-1", + "us-east-2", + "us-west-1", + "us-west-2", + ] + ami_groups = [ + # This causes the ami to be publicly-accessable. + "all", + ] + ami_description = "Lotus Filecoin AMI" + launch_block_device_mappings { + device_name = "/dev/sda1" + volume_size = 100 + delete_on_termination = true + } + + instance_type = "t2.micro" + source_ami_filter { + filters = { + name = "ubuntu/images/*ubuntu-focal-20.04-amd64-server-*" + root-device-type = "ebs" + virtualization-type = "hvm" + } + most_recent = true + owners = ["099720109477"] + } + ssh_username = "ubuntu" +} + +source "digitalocean" "lotus" { + droplet_name = "lotus-snap" + size = "s-1vcpu-1gb" + region = "nyc3" + image = "ubuntu-20-04-x64" + snapshot_name = "lotus-${var.lotus_network}-snap-${local.timestamp}" + ssh_username = "root" +} + +build { + sources = [ + "source.amazon-ebs.lotus", + "source.digitalocean.lotus", + ] + + provisioner "file" { + source = "./tools/packer/etc/motd" + destination = "motd" + } + # build it. + provisioner "shell" { + script = "./tools/packer/setup-snap.sh" + } +} diff --git a/tools/packer/lotus.pkr.hcl b/tools/packer/lotus.pkr.hcl index 8ef41613b14..cfaca83986e 100644 --- a/tools/packer/lotus.pkr.hcl +++ b/tools/packer/lotus.pkr.hcl @@ -63,19 +63,9 @@ source "amazon-ebs" "lotus" { ssh_username = "ubuntu" } -source "digitalocean" "lotus" { - droplet_name = "lotus-${var.lotus_network}" - size = "s-1vcpu-1gb" - region = "nyc3" - image = "ubuntu-20-04-x64" - snapshot_name = "lotus-${var.lotus_network}-${var.git_tag}-${local.timestamp}" - ssh_username = "root" -} - build { sources = [ "source.amazon-ebs.lotus", - "source.digitalocean.lotus", ] # Lotus software (from CI workspace) diff --git a/tools/packer/setup-snap.sh b/tools/packer/setup-snap.sh new file mode 100644 index 00000000000..2e921f8da9f --- /dev/null +++ b/tools/packer/setup-snap.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env bash + +# This script is executed by packer to setup the image. +# When this script is run, packer will have already copied binaries into the home directory of +# whichever user it has access too. This script is executed from within the home directory of that +# user. Bear in mind that different cloud providers, and different images on the same cloud +# provider will have a different initial user account. + +set -x + +# Become root, if we aren't already. +# Docker images will already be root. AMIs will have an SSH user account. +if [ x$UID != x0 ] +then + printf -v cmd_str '%q ' "$0" "$@" + exec sudo su -c "$cmd_str" +fi + +set -e + +MANAGED_FILES=( + /etc/motd +) + +# this is required on digitalocean, which does not have snap seeded correctly at this phase. +apt update +apt reinstall snapd + +snap install lotus-filecoin + +snap alias lotus-filecoin.lotus lotus +snap alias lotus-file.con.lotus-daemon lotus-daemon +snap alias lotus-filecoin.lotus-miner lotus-miner +snap alias lotus-filecoin.lotus-worker lotus-worker + +snap stop lotus-filecoin.lotus-daemon + +# Setup firewall +yes | ufw enable +ufw default deny incoming +ufw default allow outgoing +ufw allow ssh + +set +e + +curl -L https://raw.githubusercontent.com/digitalocean/marketplace-partners/master/scripts/90-cleanup.sh | bash diff --git a/tools/packer/setup.sh b/tools/packer/setup.sh index 6c0742254ff..2b190045e1b 100644 --- a/tools/packer/setup.sh +++ b/tools/packer/setup.sh @@ -65,3 +65,5 @@ ufw default deny incoming ufw default allow outgoing ufw allow ssh ufw allow 5678 #libp2p + +curl -L https://raw.githubusercontent.com/digitalocean/marketplace-partners/master/scripts/90-cleanup.sh | bash