Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expire and republish records received via PutValue #388

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type IpfsDHT struct {
routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
providers *providers.ProviderManager

nonProvRecordsManager *NonProvRecordsManager

birth time.Time // When this peer started up

Validator record.Validator
Expand Down Expand Up @@ -98,13 +100,15 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
})

dht.proc.AddChild(dht.providers.Process())
dht.proc.AddChild(dht.nonProvRecordsManager.Process())
dht.Validator = cfg.Validator

if !cfg.Client {
for _, p := range cfg.Protocols {
h.SetStreamHandler(p, dht.handleNewStream)
}
}

return dht, nil
}

Expand Down Expand Up @@ -156,6 +160,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
}

dht.ctx = dht.newContextWithLocalTags(ctx)
dht.nonProvRecordsManager = NewNonProvRecordsManager(ctx, dht, dstore)

return dht
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/mr-tron/base58 v1.1.2
github.com/multiformats/go-multiaddr v0.0.4
github.com/multiformats/go-multiaddr-dns v0.0.3
github.com/multiformats/go-multihash v0.0.5
github.com/multiformats/go-multistream v0.1.0
github.com/stretchr/testify v1.3.0
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc
Expand Down
6 changes: 5 additions & 1 deletion handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) {
recordIsBad = true
}

if time.Since(recvtime) > MaxRecordAge {
if time.Since(recvtime) > maxNonProvRecordAge {
logger.Debug("old record found, tossing.")
recordIsBad = true
}
Expand Down Expand Up @@ -396,3 +396,7 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
func convertToDsKey(s []byte) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString(s))
}

func convertToOriginalKey(k string) ([]byte, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used anymore. Need to remove this.

return base32.RawStdEncoding.DecodeString(k)
}
193 changes: 193 additions & 0 deletions non_prov_records.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package dht

import (
"context"
"math/rand"
"strings"
"sync"
"time"

"github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
u "github.com/ipfs/go-ipfs-util"
"github.com/jbenet/goprocess"
"github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-kad-dht/providers"
recpb "github.com/libp2p/go-libp2p-record/pb"
)

// vars for cleaning up expired records
var nonProvRecordCleanupInterval = time.Hour

// maxNonProvRecordAge specifies the maximum time that any node will hold onto a record
// from the time its received. This does not apply to any other forms of validity that
// the record may contain.
// For example, a record may contain an ipns entry with an EOL saying its valid
// until the year 2020 (a great time in the future). For that record to stick around
// it must be rebroadcasted more frequently than once every 'maxNonProvRecordAge'
var maxNonProvRecordAge = time.Hour * 36

// vars for republishing records
var nonProvRecordRePublishInterval = 1 * time.Hour
var nonProvRecordRePublishAge = 1 * time.Hour
var enableRepublishJitter = true

type NonProvRecordsManager struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can not put this in a separate package as IpfsDHT depends on this & this depends on IpfsDHT.

dht *IpfsDHT
ctx context.Context

proc goprocess.Process
dstore ds.Batching

cleanupInterval time.Duration // scan interval for expiring records

rePublishInterval time.Duration // scan interval for republishing records
}

func NewNonProvRecordsManager(ctx context.Context, dht *IpfsDHT, dstore ds.Batching) *NonProvRecordsManager {
m := new(NonProvRecordsManager)
m.dht = dht
m.ctx = ctx
m.dstore = dstore
m.proc = goprocessctx.WithContext(ctx)

// expire records beyond maxage
m.cleanupInterval = nonProvRecordCleanupInterval
m.proc.Go(m.expire)

// republish records older than prescribed age
m.rePublishInterval = nonProvRecordRePublishInterval
m.proc.Go(m.rePublish)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bigs We can have separate dedicated go-routines for expiry & republish as we don't need to serialize access to a resource/synchronize between them.

return m
}

func (m *NonProvRecordsManager) Process() goprocess.Process {
return m.proc
}

func (m *NonProvRecordsManager) rePublish(proc goprocess.Process) {
for {
var d = 0 * time.Minute
// minimizes the probability of all peers re-publishing together
// the first peer that re-publishes resets the receivedAt time on the record
// on all other peers that are among the K closest to the key, thus minimizing the number of republishes by other peers
if enableRepublishJitter {
d = time.Duration(rand.Intn(16)) * time.Minute
}

select {
case <-proc.Closing():
return
case <-time.After(m.rePublishInterval + d):
}

tFnc := func(t time.Time) bool {
return time.Since(t) > nonProvRecordRePublishAge && time.Since(t) < maxNonProvRecordAge
}

res, err := m.dstore.Query(query.Query{Filters: []query.Filter{&nonProvRecordFilter{tFnc}}})
if err != nil {
logger.Errorf("republish records proc: failed to run query against datastore, error is %s", err)
continue
}

var wg sync.WaitGroup
// semaphore to rate-limit number of concurrent PutValue calls
semaphore := make(chan struct{}, 5)
for {
e, ok := res.NextSync()
if !ok {
break
}

semaphore <- struct{}{}
wg.Add(1)
go func(e query.Result) {
defer func() {
<-semaphore
wg.Done()
}()

// unmarshal record
rec := new(recpb.Record)
if err := proto.Unmarshal(e.Value, rec); err != nil {
logger.Debugf("republish records proc: failed to unmarshal DHT record from datastore, error is %s", err)
return
}

// call put value
putCtx, cancel := context.WithTimeout(m.ctx, 2*time.Minute)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bigs Is 2 mins a reasonable timeout here ?

defer cancel()

// do not use e.key here as that represents the transformed version of the original key
// rec.GetKey is the original key sent by the peer who put this record to dht
if err := m.dht.PutValue(putCtx, string(rec.GetKey()), rec.Value); err != nil {
logger.Debugf("republish records proc: failed to re-publish to the network, error is %s", err)
}
}(e)
}
wg.Wait()
}
}

func (m *NonProvRecordsManager) expire(proc goprocess.Process) {
for {
select {
case <-proc.Closing():
return
case <-time.After(m.cleanupInterval):
}

tFnc := func(t time.Time) bool {
return time.Since(t) > maxNonProvRecordAge
}

res, err := m.dstore.Query(query.Query{Filters: []query.Filter{&nonProvRecordFilter{tFnc}}})
if err != nil {
logger.Errorf("expire records proc: failed to run query against datastore, error is %s", err)
continue
}

for {
e, ok := res.NextSync()
if !ok {
break
}
if err := m.dstore.Delete(ds.RawKey(e.Key)); err != nil {
logger.Errorf("expire records proc: failed to delete key %s from datastore, error is %s", e.Key, err)
}
}
}
}

type timeFilterFnc = func(t time.Time) bool

type nonProvRecordFilter struct {
tFnc timeFilterFnc
}

func (f *nonProvRecordFilter) Filter(e query.Entry) bool {
// unmarshal record
rec := new(recpb.Record)
if err := proto.Unmarshal(e.Value, rec); err != nil {
logger.Debugf("expire records filter: failed to unmarshal DHT record from datastore, error is %s", err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change tag in the logging message

return false
}

// should not be a provider record
if strings.HasPrefix(e.Key, providers.ProvidersKeyPrefix) {
return false
}

// parse received time
t, err := u.ParseRFC3339(rec.TimeReceived)
if err != nil {
logger.Debugf("expire records filter: failed to parse time in DHT record, error is %s", err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change tag in the logging message

return false
}

// apply the time filter fnc to the received time
return f.tFnc(t)
}