Skip to content

Commit

Permalink
1. Expire non-provider records older than MaxAge
Browse files Browse the repository at this point in the history
2. Original publisher shoulld republish putvalue records
  • Loading branch information
aarshkshah1992 committed Sep 2, 2019
1 parent a12e621 commit 2a950ae
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 13 deletions.
5 changes: 5 additions & 0 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator

// proc to expire putValue records
recordExpiryProc := goprocessctx.WithContext(ctx)
recordExpiryProc.Go(dht.expireNonProviderRecords)
dht.proc.AddChild(recordExpiryProc)

if !cfg.Client {
for _, p := range cfg.Protocols {
h.SetStreamHandler(p, dht.handleNewStream)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/mr-tron/base58 v1.1.2
github.com/multiformats/go-multiaddr v0.0.4
github.com/multiformats/go-multiaddr-dns v0.0.2
github.com/multiformats/go-multihash v0.0.5
github.com/multiformats/go-multistream v0.1.0
github.com/stretchr/testify v1.3.0
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc
Expand Down
2 changes: 1 addition & 1 deletion handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) {
recordIsBad = true
}

if time.Since(recvtime) > MaxRecordAge {
if time.Since(recvtime) > maxNonProviderRecordAge {
logger.Debug("old record found, tossing.")
recordIsBad = true
}
Expand Down
7 changes: 4 additions & 3 deletions providers/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching)
return pm
}

const providersKeyPrefix = "/providers/"
// prefix to be used for all provider record keys
const ProvidersKeyPrefix = "/providers/"

func mkProvKey(k cid.Cid) string {
return providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k.Bytes())
return ProvidersKeyPrefix + base32.RawStdEncoding.EncodeToString(k.Bytes())
}

func (pm *ProviderManager) Process() goprocess.Process {
Expand Down Expand Up @@ -284,7 +285,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) {

// Now, kick off a GC of the datastore.
q, err := pm.dstore.Query(dsq.Query{
Prefix: providersKeyPrefix,
Prefix: ProvidersKeyPrefix,
})
if err != nil {
log.Error("provider record GC query failed: ", err)
Expand Down
2 changes: 1 addition & 1 deletion providers/providers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestProvidesExpire(t *testing.T) {
t.Fatal("providers map not cleaned up")
}

res, err := ds.Query(dsq.Query{Prefix: providersKeyPrefix})
res, err := ds.Query(dsq.Query{Prefix: ProvidersKeyPrefix})
if err != nil {
t.Fatal(err)
}
Expand Down
74 changes: 69 additions & 5 deletions records.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,30 @@ package dht
import (
"context"
"fmt"
"strings"
"time"

"github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
u "github.com/ipfs/go-ipfs-util"
"github.com/jbenet/goprocess"
ci "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"

ci "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-kad-dht/providers"
recpb "github.com/libp2p/go-libp2p-record/pb"
)

// MaxRecordAge specifies the maximum time that any node will hold onto a record
// maxNonProviderRecordAge specifies the maximum time that any node will hold onto a record
// from the time its received. This does not apply to any other forms of validity that
// the record may contain.
// For example, a record may contain an ipns entry with an EOL saying its valid
// until the year 2020 (a great time in the future). For that record to stick around
// it must be rebroadcasted more frequently than once every 'MaxRecordAge'
const MaxRecordAge = time.Hour * 36
// it must be rebroadcasted more frequently than once every 'maxNonProviderRecordAge'
var maxNonProviderRecordAge = time.Hour * 12

var defaultRecordsSweepInterval = time.Hour * 1

type pubkrs struct {
pubk ci.PubKey
Expand Down Expand Up @@ -135,3 +144,58 @@ func (dht *IpfsDHT) getPublicKeyFromNode(ctx context.Context, p peer.ID) (ci.Pub
logger.Debugf("Got public key from node %v itself", p)
return pubk, nil
}

func (dht *IpfsDHT) expireNonProviderRecords(proc goprocess.Process) {
for {
select {
case <-proc.Closing():
return
case <-time.After(defaultRecordsSweepInterval):
}

res, err := dht.datastore.Query(query.Query{Filters: []query.Filter{&expireRecordFilter{}}})
if err != nil {
logger.Errorf("expire records proc: failed to run query against datastore, error is %+v", err)
continue
}

for {
e, ok := res.NextSync()
if !ok {
break
}
if err := dht.datastore.Delete(ds.RawKey(e.Key)); err != nil {
logger.Errorf("expire records proc: failed to delete key %s from datastore, error is %+v", e.Key, err)
}
}
}
}

type expireRecordFilter struct{}

func (f *expireRecordFilter) Filter(e query.Entry) bool {
// unmarshal record
rec := new(recpb.Record)
if err := proto.Unmarshal(e.Value, rec); err != nil {
logger.Debugf("expire records filter: failed to unmarshal DHT record from datastore, error is %+v", err)
return false
}

// should not be a provider record
if strings.HasPrefix(e.Key, providers.ProvidersKeyPrefix) {
return false
}

// age should be greater than maxNonProviderRecordAge
t, err := u.ParseRFC3339(rec.TimeReceived)
if err != nil {
logger.Debugf("expire records filter: failed to parse time in DHT record, error is %+v", err)
return false
}

if time.Since(t) > maxNonProviderRecordAge {
return true
}

return false
}
106 changes: 103 additions & 3 deletions records_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,22 @@ package dht
import (
"context"
"crypto/rand"
"github.com/libp2p/go-libp2p-core/test"
"testing"
"time"

"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/test"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/assert"

ds "github.com/ipfs/go-datastore"
u "github.com/ipfs/go-ipfs-util"
ci "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
record "github.com/libp2p/go-libp2p-record"
tnet "github.com/libp2p/go-libp2p-testing/net"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p-testing/net"
)

// Check that GetPublicKey() correctly extracts a public key
Expand Down Expand Up @@ -305,3 +311,97 @@ func TestPubkeyGoodKeyFromDHTGoodKeyDirect(t *testing.T) {
t.Fatal("got incorrect public key")
}
}

func TestExpireNonProviderRecords(t *testing.T) {
sVal := defaultRecordsSweepInterval
defer func() { defaultRecordsSweepInterval = sVal }()

defaultRecordsSweepInterval = 20 * time.Millisecond

// create dht
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := setupDHT(ctx, t, false)

putRecord := func(key string, value []byte) error {
rec := record.MakePutRecord(key, value)
pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
pmes.Record = rec
_, err := d.handlePutValue(ctx, "testpeer", pmes)
return err
}

addProv := func(c cid.Cid) error {
msg, err := d.makeProvRecord(c)
pi := peer.AddrInfo{
ID: "testpeer",
Addrs: d.host.Addrs(),
}
msg.ProviderPeers = pb.RawPeerInfosToPBPeers([]peer.AddrInfo{pi})
assert.NoError(t, err)

_, err = d.handleAddProvider(ctx, "testpeer", msg)
return err
}

getProv := func(c cid.Cid) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, c.Bytes(), 0)
m, err := d.handleGetProviders(ctx, "test peer", pmes)
return m, err
}

// put non-provider record 1 with current time
key1 := "/v/key1"
value1 := []byte("v1")
assert.NoError(t, putRecord(key1, value1))

// put non-provider record 2 with current time
key2 := "/v/key2"
value2 := []byte("v2")
assert.NoError(t, putRecord(key2, value2))

// add provider with current time
mh, err := multihash.Sum([]byte("data"), multihash.SHA2_256, -1)
assert.NoError(t, err)
c := cid.NewCidV0(mh)
assert.NoError(t, addProv(c))

// sweep will not delete any of them
time.Sleep(100 * time.Millisecond)

// get & verify all are present

// we need to check the datastore for non-provider records to test the expiry Proc
// because a side-effect of handle get value is also that it deletes records which are beyond MaxAge
// & we do not want to hit that path
_, err = d.datastore.Get(convertToDsKey([]byte(key1)))
assert.NoError(t, err)

_, err = d.datastore.Get(convertToDsKey([]byte(key2)))
assert.NoError(t, err)

// ensure provider record is still available
m, err := getProv(c)
assert.NoError(t, err)
assert.NotEmpty(t, m.ProviderPeers)

// change max age to 100 millisecond
mVal := maxNonProviderRecordAge
maxNonProviderRecordAge = 100 * time.Millisecond
defer func() { maxNonProviderRecordAge = mVal }()

// sweep will remove non-provider both records now
time.Sleep(100 * time.Millisecond)

// verify both non-provider records are absent
_, err = d.datastore.Get(convertToDsKey([]byte(key1)))
assert.Equal(t, ds.ErrNotFound, err)

_, err = d.datastore.Get(convertToDsKey([]byte(key2)))
assert.Equal(t, ds.ErrNotFound, err)

// but, provider record will still be available
m, err = getProv(c)
assert.NoError(t, err)
assert.NotEmpty(t, m.ProviderPeers)
}
24 changes: 24 additions & 0 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
// results will wait for the channel to drain.
var asyncQueryBuffer = 10

var putValueRepublishInterval = 6 * time.Hour

// This file implements the Routing interface for the IpfsDHT struct.

// Basic Put/Get
Expand Down Expand Up @@ -98,6 +100,28 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts
}(p)
}
wg.Wait()

// original publisher should keep re-publishing the record because the network isn't `steady`/`stable`
// and the K closet peers we just published to can become unavailable / no longer be the K closet
go func() {
for {
select {
case <-dht.proc.Closing():
return
case <-time.After(putValueRepublishInterval):
// TODO:We can not re-use the original context here as it may have expired
// But, is it fair to use this one ?
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
if err := dht.PutValue(ctx, key, value, opts...); err != nil {
logger.Errorf("putValue republish proc: failed to republish key %s, error is %+v", key, err)
} else {
logger.Debugf("putValue republish proc: successfully republished key %s", key)
}
cancel()
}
}
}()

return nil
}

Expand Down

0 comments on commit 2a950ae

Please sign in to comment.