Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
Merge pull request #81 from libp2p/feat/77
Browse files Browse the repository at this point in the history
Further Optimizations and refactor benchmarking code.
  • Loading branch information
aarshkshah1992 committed May 7, 2020
2 parents 69090b2 + 9335a6e commit 02dc2ad
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 70 deletions.
105 changes: 90 additions & 15 deletions benchmark_test.go
Expand Up @@ -2,16 +2,36 @@ package noise

import (
"context"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/sec"
"golang.org/x/crypto/poly1305"
"io"
"io/ioutil"
"math/rand"
"net"
"testing"
"time"

"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/sec"
)

type testMode int

const (
readBufferGtEncMsg testMode = iota
readBufferLtPlainText
)

var bcs = map[string]struct {
m testMode
}{
"readBuffer > encrypted message": {
readBufferGtEncMsg,
},
"readBuffer < decrypted plaintext": {
readBufferLtPlainText,
},
}

func makeTransport(b *testing.B) *Transport {
b.Helper()

Expand Down Expand Up @@ -78,28 +98,48 @@ func (b benchenv) connect(stopTimer bool) (*secureSession, *secureSession) {
return initSession.(*secureSession), respSession.(*secureSession)
}

func drain(r io.Reader, done chan<- error) {
_, err := io.Copy(ioutil.Discard, r)
func drain(r io.Reader, done chan<- error, writeTo io.Writer) {
_, err := io.Copy(writeTo, r)
done <- err
}

func sink(dst io.WriteCloser, src io.Reader, done chan<- error) {
_, err := io.Copy(dst, src)
type discardWithBuffer struct {
buf []byte
io.Writer
}

func (d *discardWithBuffer) ReadFrom(r io.Reader) (n int64, err error) {
readSize := 0
for {
readSize, err = r.Read(d.buf)
n += int64(readSize)
if err != nil {
if err == io.EOF {
return n, nil
}
return
}
}
}

func sink(dst io.WriteCloser, src io.Reader, done chan<- error, buf []byte) {
_, err := io.CopyBuffer(dst, src, buf)
if err != nil {
done <- err
}
done <- dst.Close()
}

func pipeRandom(src rand.Source, w io.WriteCloser, r io.Reader, n int64) error {
func pipeRandom(src rand.Source, w io.WriteCloser, r io.Reader, n int64, plainTextBuf []byte,
writeTo io.Writer) error {
rnd := rand.New(src)
lr := io.LimitReader(rnd, n)

writeCh := make(chan error, 1)
readCh := make(chan error, 1)

go sink(w, lr, writeCh)
go drain(r, readCh)
go sink(w, lr, writeCh, plainTextBuf)
go drain(r, readCh, writeTo)

writeDone := false
readDone := false
Expand All @@ -121,39 +161,74 @@ func pipeRandom(src rand.Source, w io.WriteCloser, r io.Reader, n int64) error {
return nil
}

func benchDataTransfer(b *benchenv, size int64) {
func benchDataTransfer(b *benchenv, dataSize int64, m testMode) {
var totalBytes int64
var totalTime time.Duration

plainTextBufs := make([][]byte, 61)
writeTos := make(map[int]io.Writer)
for i := 0; i < len(plainTextBufs); i++ {
var rbuf []byte
// plaintext will be 2 KB to 62 KB
plainTextBufs[i] = make([]byte, (i+2)*1024)
switch m {
case readBufferGtEncMsg:
rbuf = make([]byte, len(plainTextBufs[i])+poly1305.TagSize+1)
case readBufferLtPlainText:
rbuf = make([]byte, len(plainTextBufs[i])-2)
}
writeTos[i] = &discardWithBuffer{rbuf, ioutil.Discard}
}

b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
initSession, respSession := b.connect(true)

start := time.Now()
err := pipeRandom(b.rndSrc, initSession, respSession, size)

bufi := i % len(plainTextBufs)
err := pipeRandom(b.rndSrc, initSession, respSession, dataSize, plainTextBufs[bufi], writeTos[bufi])
if err != nil {
b.Fatalf("error sending random data: %s", err)
}
elapsed := time.Since(start)
totalTime += elapsed
totalBytes += size
totalBytes += dataSize
}
bytesPerSec := float64(totalBytes) / totalTime.Seconds()
b.ReportMetric(bytesPerSec, "bytes/sec")
}

type bc struct {
plainTextChunkLen int64
readBufferLen int64
}

func BenchmarkTransfer1MB(b *testing.B) {
benchDataTransfer(setupEnv(b), 1024*1024)
for n, bc := range bcs {
b.Run(n, func(b *testing.B) {
benchDataTransfer(setupEnv(b), 1024*1024, bc.m)
})
}

}

func BenchmarkTransfer100MB(b *testing.B) {
benchDataTransfer(setupEnv(b), 1024*1024*100)
for n, bc := range bcs {
b.Run(n, func(b *testing.B) {
benchDataTransfer(setupEnv(b), 1024*1024*100, bc.m)
})
}
}

func BenchmarkTransfer500Mb(b *testing.B) {
benchDataTransfer(setupEnv(b), 1024*1024*500)
for n, bc := range bcs {
b.Run(n, func(b *testing.B) {
benchDataTransfer(setupEnv(b), 1024*1024*500, bc.m)
})
}
}

func (b benchenv) benchHandshake() {
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Expand Up @@ -9,5 +9,6 @@ require (
github.com/libp2p/go-libp2p v0.8.1
github.com/libp2p/go-libp2p-core v0.5.1
github.com/multiformats/go-multiaddr v0.2.1
github.com/stretchr/testify v1.5.1
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5
)
)
47 changes: 36 additions & 11 deletions handshake.go
Expand Up @@ -3,7 +3,10 @@ package noise
import (
"context"
"crypto/rand"
"encoding/binary"
"fmt"
pool "github.com/libp2p/go-buffer-pool"
"golang.org/x/crypto/poly1305"
"time"

"github.com/flynn/noise"
Expand Down Expand Up @@ -54,15 +57,21 @@ func (s *secureSession) runHandshake(ctx context.Context) error {
// schedule the deadline removal once we're done handshaking.
defer s.SetDeadline(time.Time{})
}
// TODO: else case (transport doesn't support native timeouts); spin off
// a goroutine to monitor the context cancellation and pull the rug
// from under by closing the connection altogether.
}

// We can re-use this buffer for all handshake messages as it's size
// will be the size of the maximum handshake message for the Noise XX pattern.
// Also, since we prefix every noise handshake message with it's length, we need to account for
// it when we fetch the buffer from the pool
maxMsgSize := 2*noise.DH25519.DHLen() + len(payload) + 2*poly1305.TagSize
hbuf := pool.Get(maxMsgSize + LengthPrefixLength)
defer pool.Put(hbuf)

if s.initiator {
// stage 0 //
// do not send the payload just yet, as it would be plaintext; not secret.
err = s.sendHandshakeMessage(hs, nil)
// Handshake Msg Len = len(DH ephemeral key)
err = s.sendHandshakeMessage(hs, nil, hbuf)
if err != nil {
return fmt.Errorf("error sending handshake message: %w", err)
}
Expand All @@ -78,7 +87,8 @@ func (s *secureSession) runHandshake(ctx context.Context) error {
}

// stage 2 //
err = s.sendHandshakeMessage(hs, payload)
// Handshake Msg Len = len(DHT static key) + MAC(static key is encrypted) + len(Payload) + MAC(payload is encrypted)
err = s.sendHandshakeMessage(hs, payload, hbuf)
if err != nil {
return fmt.Errorf("error sending handshake message: %w", err)
}
Expand All @@ -90,7 +100,9 @@ func (s *secureSession) runHandshake(ctx context.Context) error {
}

// stage 1 //
err = s.sendHandshakeMessage(hs, payload)
// Handshake Msg Len = len(DH ephemeral key) + len(DHT static key) + MAC(static key is encrypted) + len(Payload) +
//MAC(payload is encrypted)
err = s.sendHandshakeMessage(hs, payload, hbuf)
if err != nil {
return fmt.Errorf("error sending handshake message: %w", err)
}
Expand Down Expand Up @@ -129,13 +141,18 @@ func (s *secureSession) setCipherStates(cs1, cs2 *noise.CipherState) {
// If payload is non-empty, it will be included in the handshake message.
// If this is the final message in the sequence, calls setCipherStates
// to initialize cipher states.
func (s *secureSession) sendHandshakeMessage(hs *noise.HandshakeState, payload []byte) error {
buf, cs1, cs2, err := hs.WriteMessage(nil, payload)
func (s *secureSession) sendHandshakeMessage(hs *noise.HandshakeState, payload []byte, hbuf []byte) error {
// the first two bytes will be the length of the noise handshake message.
bz, cs1, cs2, err := hs.WriteMessage(hbuf[:LengthPrefixLength], payload)
if err != nil {
return err
}

_, err = s.writeMsgInsecure(buf)
// bz will also include the length prefix as we passed a slice of LengthPrefixLength length
// to hs.Write().
binary.BigEndian.PutUint16(bz, uint16(len(bz)-LengthPrefixLength))

_, err = s.writeMsgInsecure(bz)
if err != nil {
return err
}
Expand All @@ -154,11 +171,19 @@ func (s *secureSession) sendHandshakeMessage(hs *noise.HandshakeState, payload [
// If this is the final message in the sequence, it calls setCipherStates
// to initialize cipher states.
func (s *secureSession) readHandshakeMessage(hs *noise.HandshakeState) ([]byte, error) {
raw, err := s.readMsgInsecure()
l, err := s.readNextInsecureMsgLen()
if err != nil {
return nil, err
}
msg, cs1, cs2, err := hs.ReadMessage(nil, raw)

buf := pool.Get(l)
defer pool.Put(buf)

if err := s.readNextMsgInsecure(buf); err != nil {
return nil, err
}

msg, cs1, cs2, err := hs.ReadMessage(nil, buf)
if err != nil {
return nil, err
}
Expand Down
1 change: 0 additions & 1 deletion integration_test.go
Expand Up @@ -128,7 +128,6 @@ func TestLibp2pIntegration(t *testing.T) {
}

<-doneCh
fmt.Println("fin")
}

func writeRandomPayloadAndClose(t *testing.T, stream net.Stream) error {
Expand Down

0 comments on commit 02dc2ad

Please sign in to comment.