Skip to content

Commit

Permalink
feat: provider deals pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Mar 8, 2022
1 parent b328bea commit 223a5a4
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 0 deletions.
49 changes: 49 additions & 0 deletions storagemarket/impl/provider.go
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"os"
"sort"
"time"

"github.com/hannahhoward/go-pubsub"
Expand Down Expand Up @@ -454,6 +455,22 @@ func (p *Provider) RetryDealPublishing(propcid cid.Cid) error {
return p.deals.Send(propcid, storagemarket.ProviderEventRestart)
}

func (p *Provider) GetLocalDeal(cid cid.Cid) (storagemarket.MinerDeal, error) {
var out storagemarket.MinerDeal
if err := p.deals.Get(cid).Get(&out); err != nil {
return storagemarket.MinerDeal{}, err
}
return out, nil
}

func (p *Provider) LocalDealCount() (int, error) {
var out []storagemarket.MinerDeal
if err := p.deals.List(&out); err != nil {
return 0, err
}
return len(out), nil
}

// ListLocalDeals lists deals processed by this storage provider
func (p *Provider) ListLocalDeals() ([]storagemarket.MinerDeal, error) {
var out []storagemarket.MinerDeal
Expand All @@ -463,6 +480,38 @@ func (p *Provider) ListLocalDeals() ([]storagemarket.MinerDeal, error) {
return out, nil
}

func (p *Provider) ListLocalDealsPage(offsetPropCid *cid.Cid, count int) ([]storagemarket.MinerDeal, error) {
if count == 0 {
return []storagemarket.MinerDeal{}, nil
}

// Get all deals
var deals []storagemarket.MinerDeal
if err := p.deals.List(&deals); err != nil {
return nil, err
}

// Sort by creation time descending
sort.Slice(deals, func(i, j int) bool {
return deals[i].CreationTime.Time().After(deals[j].CreationTime.Time())
})

// Iterate through deals until we reach the target signed proposal cid,
// then add deals from that point up to count
page := make([]storagemarket.MinerDeal, 0, count)
for _, dl := range deals {
if offsetPropCid == nil || dl.ProposalCid == *offsetPropCid {
page = append(page, dl)
offsetPropCid = nil // add all deals from this point on
}
if len(page) == count {
return page, nil
}
}

return page, nil
}

// SetAsk configures the storage miner's ask with the provided price,
// duration, and options. Any previously-existing ask is replaced.
func (p *Provider) SetAsk(price abi.TokenAmount, verifiedPrice abi.TokenAmount, duration abi.ChainEpoch, options ...storagemarket.StorageAskOption) error {
Expand Down
41 changes: 41 additions & 0 deletions storagemarket/impl/provider_test.go
Expand Up @@ -168,6 +168,47 @@ func TestProvider_Migrations(t *testing.T) {
}
require.Equal(t, expectedDeal, deal)
}

// Verify get deal by signed proposal cid
deal, err := provider.GetLocalDeal(deals[0].ProposalCid)
require.NoError(t, err)
require.Equal(t, deals[0].ProposalCid, deal.ProposalCid)

// Verify the deal count
count, err := provider.LocalDealCount()
require.NoError(t, err)
require.Equal(t, len(deals), count)

// Verify get a page of deals without a nil offset proposal cid
listedDeals, err := provider.ListLocalDealsPage(nil, len(deals))
require.NoError(t, err)
require.Len(t, listedDeals, len(deals))
for i, dl := range listedDeals {
if i == 0 {
continue
}
// Verify descending order by creation time
require.True(t, dl.CreationTime.Time().Before(listedDeals[i-1].CreationTime.Time()))
}
firstDeal := listedDeals[0]
secondDeal := listedDeals[1]
thirdDeal := listedDeals[2]

// Verify get a page of deals with a nil offset proposal cid and with a limit
listedDeals, err = provider.ListLocalDealsPage(nil, 2)
require.NoError(t, err)
require.Len(t, listedDeals, 2)
// Verify correct deals
require.Equal(t, firstDeal.ProposalCid, listedDeals[0].ProposalCid)
require.Equal(t, secondDeal.ProposalCid, listedDeals[1].ProposalCid)

// Verify get a page of deals with an offset proposal cid and with a limit
listedDeals, err = provider.ListLocalDealsPage(&secondDeal.ProposalCid, 2)
require.NoError(t, err)
require.Len(t, listedDeals, 2)
// Verify correct deals
require.Equal(t, secondDeal.ProposalCid, listedDeals[0].ProposalCid)
require.Equal(t, thirdDeal.ProposalCid, listedDeals[1].ProposalCid)
}

func TestHandleDealStream(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions storagemarket/provider.go
Expand Up @@ -36,9 +36,20 @@ type StorageProvider interface {
// GetAsk returns the storage miner's ask, or nil if one does not exist.
GetAsk() *SignedStorageAsk

// GetLocalDeal gets a deal by signed proposal cid
GetLocalDeal(cid cid.Cid) (MinerDeal, error)

// LocalDealCount gets the number of local deals
LocalDealCount() (int, error)

// ListLocalDeals lists deals processed by this storage provider
ListLocalDeals() ([]MinerDeal, error)

// ListLocalDealsPage lists deals by creation time descending, starting
// at the deal with the given signed proposal cid and returning up to
// count deals
ListLocalDealsPage(offsetPropCid *cid.Cid, count int) ([]MinerDeal, error)

// AddStorageCollateral adds storage collateral
AddStorageCollateral(ctx context.Context, amount abi.TokenAmount) error

Expand Down

0 comments on commit 223a5a4

Please sign in to comment.