Skip to content

Commit

Permalink
Merge pull request #296 from jbenet/consistency
Browse files Browse the repository at this point in the history
Ipns Consistency
  • Loading branch information
jbenet committed Nov 16, 2014
2 parents e290b54 + f45d575 commit 6f8569d
Show file tree
Hide file tree
Showing 16 changed files with 467 additions and 49 deletions.
4 changes: 4 additions & 0 deletions core/core.go
Expand Up @@ -31,6 +31,8 @@ import (
ctxc "github.com/jbenet/go-ipfs/util/ctxcloser"
)

const IpnsValidatorTag = "ipns"

var log = u.Logger("core")

// IpfsNode is IPFS Core module. It represents an IPFS instance.
Expand Down Expand Up @@ -156,6 +158,8 @@ func NewIpfsNode(cfg *config.Config, online bool) (n *IpfsNode, err error) {

// setup routing service
dhtRouting := dht.NewDHT(ctx, n.Identity, n.Peerstore, n.Network, dhtService, n.Datastore)
dhtRouting.Validators[IpnsValidatorTag] = namesys.ValidateIpnsRecord

// TODO(brian): perform this inside NewDHT factory method
dhtService.SetHandler(dhtRouting) // wire the handler to the service.
n.Routing = dhtRouting
Expand Down
54 changes: 51 additions & 3 deletions namesys/internal/pb/namesys.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions namesys/internal/pb/namesys.proto
@@ -1,6 +1,13 @@
package namesys.pb;

message IpnsEntry {
enum ValidityType {
// setting an EOL says "this record is valid until..."
EOL = 0;
}
required bytes value = 1;
required bytes signature = 2;

optional ValidityType validityType = 3;
optional bytes validity = 4;
}
70 changes: 62 additions & 8 deletions namesys/publisher.go
@@ -1,6 +1,8 @@
package namesys

import (
"bytes"
"errors"
"fmt"
"time"

Expand All @@ -14,6 +16,14 @@ import (
u "github.com/jbenet/go-ipfs/util"
)

// ErrExpiredRecord should be returned when an ipns record is
// invalid due to being too old
var ErrExpiredRecord = errors.New("expired record")

// ErrUnrecognizedValidity is returned when an IpnsRecord has an
// unknown validity type.
var ErrUnrecognizedValidity = errors.New("unrecognized validity type")

// ipnsPublisher is capable of publishing and resolving names to the IPFS
// routing system.
type ipnsPublisher struct {
Expand All @@ -33,34 +43,40 @@ func (p *ipnsPublisher) Publish(k ci.PrivKey, value string) error {
// validate `value` is a ref (multihash)
_, err := mh.FromB58String(value)
if err != nil {
log.Errorf("hash cast failed: %s", value)
return fmt.Errorf("publish value must be str multihash. %v", err)
}

ctx := context.TODO()
data, err := createRoutingEntryData(k, value)
if err != nil {
log.Error("entry creation failed.")
return err
}
pubkey := k.GetPublic()
pkbytes, err := pubkey.Bytes()
if err != nil {
return nil
log.Error("pubkey getbytes failed.")
return err
}

nameb := u.Hash(pkbytes)
namekey := u.Key(nameb).Pretty()
ipnskey := u.Hash([]byte("/ipns/" + namekey))
namekey := u.Key("/pk/" + string(nameb))

log.Debugf("Storing pubkey at: %s", namekey)
// Store associated public key
timectx, _ := context.WithDeadline(ctx, time.Now().Add(time.Second*4))
err = p.routing.PutValue(timectx, u.Key(nameb), pkbytes)
err = p.routing.PutValue(timectx, namekey, pkbytes)
if err != nil {
return err
}

// Store ipns entry at h("/ipns/"+b58(h(pubkey)))
ipnskey := u.Key("/ipns/" + string(nameb))

log.Debugf("Storing ipns entry at: %s", ipnskey)
// Store ipns entry at "/ipns/"+b58(h(pubkey))
timectx, _ = context.WithDeadline(ctx, time.Now().Add(time.Second*4))
err = p.routing.PutValue(timectx, u.Key(ipnskey), data)
err = p.routing.PutValue(timectx, ipnskey, data)
if err != nil {
return err
}
Expand All @@ -70,11 +86,49 @@ func (p *ipnsPublisher) Publish(k ci.PrivKey, value string) error {

func createRoutingEntryData(pk ci.PrivKey, val string) ([]byte, error) {
entry := new(pb.IpnsEntry)
sig, err := pk.Sign([]byte(val))

entry.Value = []byte(val)
typ := pb.IpnsEntry_EOL
entry.ValidityType = &typ
entry.Validity = []byte(u.FormatRFC3339(time.Now().Add(time.Hour * 24)))

sig, err := pk.Sign(ipnsEntryDataForSig(entry))
if err != nil {
return nil, err
}
entry.Signature = sig
entry.Value = []byte(val)
return proto.Marshal(entry)
}

func ipnsEntryDataForSig(e *pb.IpnsEntry) []byte {
return bytes.Join([][]byte{
e.Value,
e.Validity,
[]byte(fmt.Sprint(e.GetValidityType())),
},
[]byte{})
}

// ValidateIpnsRecord implements ValidatorFunc and verifies that the
// given 'val' is an IpnsEntry and that that entry is valid.
func ValidateIpnsRecord(k u.Key, val []byte) error {
entry := new(pb.IpnsEntry)
err := proto.Unmarshal(val, entry)
if err != nil {
return err
}
switch entry.GetValidityType() {
case pb.IpnsEntry_EOL:
t, err := u.ParseRFC3339(string(entry.GetValue()))
if err != nil {
log.Error("Failed parsing time for ipns record EOL")
return err
}
if time.Now().After(t) {
return ErrExpiredRecord
}
default:
return ErrUnrecognizedValidity
}
return nil
}
8 changes: 5 additions & 3 deletions namesys/routing.go
Expand Up @@ -46,7 +46,7 @@ func (r *routingResolver) Resolve(name string) (string, error) {

// use the routing system to get the name.
// /ipns/<name>
h := u.Hash([]byte("/ipns/" + name))
h := []byte("/ipns/" + string(hash))

ipnsKey := u.Key(h)
val, err := r.routing.GetValue(ctx, ipnsKey)
Expand All @@ -63,7 +63,7 @@ func (r *routingResolver) Resolve(name string) (string, error) {

// name should be a public key retrievable from ipfs
// /ipfs/<name>
key := u.Key(hash)
key := u.Key("/pk/" + string(hash))
pkval, err := r.routing.GetValue(ctx, key)
if err != nil {
log.Warning("RoutingResolve PubKey Get failed.")
Expand All @@ -75,9 +75,11 @@ func (r *routingResolver) Resolve(name string) (string, error) {
if err != nil {
return "", err
}
hsh, _ := pk.Hash()
log.Debugf("pk hash = %s", u.Key(hsh))

// check sig with pk
if ok, err := pk.Verify(entry.GetValue(), entry.GetSignature()); err != nil || !ok {
if ok, err := pk.Verify(ipnsEntryDataForSig(entry), entry.GetSignature()); err != nil || !ok {
return "", fmt.Errorf("Invalid value. Not signed by PrivateKey corresponding to %v", pk)
}

Expand Down
63 changes: 53 additions & 10 deletions routing/dht/dht.go
Expand Up @@ -60,6 +60,9 @@ type IpfsDHT struct {
//lock to make diagnostics work better
diaglock sync.Mutex

// record validator funcs
Validators map[string]ValidatorFunc

ctxc.ContextCloser
}

Expand All @@ -82,6 +85,9 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia
dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Hour)
dht.birth = time.Now()

dht.Validators = make(map[string]ValidatorFunc)
dht.Validators["pk"] = ValidatePublicKeyRecord

if doPinging {
dht.Children().Add(1)
go dht.PingRoutine(time.Second * 10)
Expand Down Expand Up @@ -215,16 +221,16 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Messa

// putValueToNetwork stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
key string, value []byte) error {
key string, rec *pb.Record) error {

pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
pmes.Value = value
pmes.Record = rec
rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
return err
}

if !bytes.Equal(rpmes.Value, pmes.Value) {
if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
return errors.New("value not put correctly")
}
return nil
Expand Down Expand Up @@ -260,11 +266,17 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
return nil, nil, err
}

log.Debugf("pmes.GetValue() %v", pmes.GetValue())
if value := pmes.GetValue(); value != nil {
if record := pmes.GetRecord(); record != nil {
// Success! We were given the value
log.Debug("getValueOrPeers: got value")
return value, nil, nil

// make sure record is still valid
err = dht.verifyRecord(record)
if err != nil {
log.Error("Received invalid record!")
return nil, nil, err
}
return record.GetValue(), nil, nil
}

// TODO decide on providers. This probably shouldn't be happening.
Expand Down Expand Up @@ -325,10 +337,15 @@ func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
continue
}

if value := pmes.GetValue(); value != nil {
if record := pmes.GetRecord(); record != nil {
// Success! We were given the value

err := dht.verifyRecord(record)
if err != nil {
return nil, err
}
dht.providers.AddProvider(key, p)
return value, nil
return record.GetValue(), nil
}
}
return nil, routing.ErrNotFound
Expand All @@ -338,21 +355,47 @@ func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
dht.dslock.Lock()
defer dht.dslock.Unlock()
log.Debug("getLocal %s", key)
v, err := dht.datastore.Get(key.DsKey())
if err != nil {
return nil, err
}
log.Debug("found in db")

byt, ok := v.([]byte)
if !ok {
return nil, errors.New("value stored in datastore not []byte")
}
return byt, nil
rec := new(pb.Record)
err = proto.Unmarshal(byt, rec)
if err != nil {
return nil, err
}

// TODO: 'if paranoid'
if u.Debug {
err = dht.verifyRecord(rec)
if err != nil {
log.Errorf("local record verify failed: %s", err)
return nil, err
}
}

return rec.GetValue(), nil
}

// putLocal stores the key value pair in the datastore
func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
return dht.datastore.Put(key.DsKey(), value)
rec, err := dht.makePutRecord(key, value)
if err != nil {
return err
}
data, err := proto.Marshal(rec)
if err != nil {
return err
}

return dht.datastore.Put(key.DsKey(), data)
}

// Update signals to all routingTables to Update their last-seen status
Expand Down

0 comments on commit 6f8569d

Please sign in to comment.