Skip to content

Commit

Permalink
Merge pull request #148 from libp2p/feat/stream-deadlines
Browse files Browse the repository at this point in the history
update dependencies and add deadline methods to streams
  • Loading branch information
whyrusleeping committed Nov 9, 2016
2 parents bb6a43c + 6f804db commit 1c10ed4
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 52 deletions.
2 changes: 1 addition & 1 deletion p2p/host/basic/basic_host.go
Expand Up @@ -21,7 +21,7 @@ import (
msmux "github.com/whyrusleeping/go-multistream"
)

var log = logging.Logger("github.com/libp2p/go-libp2p/p2p/host/basic")
var log = logging.Logger("basichost")

// Option is a type used to pass in options to the host.
type Option int
Expand Down
9 changes: 4 additions & 5 deletions p2p/net/mock/mock_link.go
Expand Up @@ -2,7 +2,7 @@ package mocknet

import (
// "fmt"
"io"
"net"
"sync"
"time"

Expand Down Expand Up @@ -45,11 +45,10 @@ func (l *link) newConnPair(dialer *peernet) (*conn, *conn) {
}

func (l *link) newStreamPair() (*stream, *stream) {
r1, w1 := io.Pipe()
r2, w2 := io.Pipe()
a, b := net.Pipe()

s1 := NewStream(w2, r1)
s2 := NewStream(w1, r2)
s1 := NewStream(a)
s2 := NewStream(b)
return s1, s2
}

Expand Down
36 changes: 23 additions & 13 deletions p2p/net/mock/mock_stream.go
Expand Up @@ -3,6 +3,7 @@ package mocknet
import (
"bytes"
"io"
"net"
"time"

process "github.com/jbenet/goprocess"
Expand All @@ -12,8 +13,7 @@ import (

// stream implements inet.Stream
type stream struct {
io.Reader
io.Writer
Pipe net.Conn
conn *conn
toDeliver chan *transportObject
proc process.Process
Expand All @@ -26,10 +26,9 @@ type transportObject struct {
arrivalTime time.Time
}

func NewStream(w io.Writer, r io.Reader) *stream {
func NewStream(p net.Conn) *stream {
s := &stream{
Reader: r,
Writer: w,
Pipe: p,
toDeliver: make(chan *transportObject),
}

Expand Down Expand Up @@ -70,12 +69,7 @@ func (s *stream) teardown() error {
// at this point, no streams are writing.

s.conn.removeStream(s)
if r, ok := (s.Reader).(io.Closer); ok {
r.Close()
}
if w, ok := (s.Writer).(io.Closer); ok {
w.Close()
}
s.Pipe.Close()
s.conn.net.notifyAll(func(n inet.Notifiee) {
n.ClosedStream(s.conn.net, s)
})
Expand All @@ -86,6 +80,22 @@ func (s *stream) Conn() inet.Conn {
return s.conn
}

func (s *stream) SetDeadline(t time.Time) error {
return s.Pipe.SetDeadline(t)
}

func (s *stream) SetReadDeadline(t time.Time) error {
return s.Pipe.SetReadDeadline(t)
}

func (s *stream) SetWriteDeadline(t time.Time) error {
return s.Pipe.SetWriteDeadline(t)
}

func (s *stream) Read(b []byte) (int, error) {
return s.Pipe.Read(b)
}

// transport will grab message arrival times, wait until that time, and
// then write the message out when it is scheduled to arrive
func (s *stream) transport(proc process.Process) {
Expand All @@ -97,7 +107,7 @@ func (s *stream) transport(proc process.Process) {
// done only when arrival time makes sense.
drainBuf := func() {
if buf.Len() > 0 {
_, err := s.Writer.Write(buf.Bytes())
_, err := s.Pipe.Write(buf.Bytes())
if err != nil {
return
}
Expand Down Expand Up @@ -131,7 +141,7 @@ func (s *stream) transport(proc process.Process) {
drainBuf()

// write this message.
_, err := s.Writer.Write(o.msg)
_, err := s.Pipe.Write(o.msg)
if err != nil {
log.Error("mock_stream", err)
}
Expand Down
43 changes: 10 additions & 33 deletions package.json
Expand Up @@ -98,11 +98,6 @@
"name": "randbo",
"version": "0.0.0"
},
{
"hash": "Qmb1US8uyZeEpMyc56wVZy2cDFdQjNFojAUYVCoo9ieTqp",
"name": "go-stream-muxer",
"version": "1.0.0"
},
{
"hash": "QmeQW4ayVqi7Jjay1SrP2wYydsH9KwSrzQBnqyC25gPFnG",
"name": "go-notifier",
Expand All @@ -114,9 +109,9 @@
"version": "0.0.0"
},
{
"hash": "QmS9en3mcwW2HRSeRabceJEGVxTZF4vEeFm7JHWQwWsb1U",
"hash": "QmVwFjMdejJ8mGVmgyR2mKcUHrvNBDtDsKRT99soVbkFhA",
"name": "go-peerstream",
"version": "1.4.1"
"version": "1.5.0"
},
{
"author": "whyrusleeping",
Expand All @@ -142,24 +137,6 @@
"name": "go-libp2p-secio",
"version": "1.1.0"
},
{
"author": "whyrusleeping",
"hash": "QmSHTSkxXGQgaHWz91oZV3CDy3hmKmDgpjbYRT6niACG4E",
"name": "go-smux-yamux",
"version": "1.1.1"
},
{
"author": "whyrusleeping",
"hash": "QmetupZ62uEdoqNsbZUCgqU3JyfssExBfqBwBhDpjyE6eW",
"name": "go-smux-multistream",
"version": "1.4.0"
},
{
"author": "whyrusleeping",
"hash": "QmfXgTygwsTPyUWPWTAeBK6cFtTdMqmeeqhyhcNMhRpT1g",
"name": "go-smux-spdystream",
"version": "1.1.1"
},
{
"author": "whyrusleeping",
"hash": "QmXXCcQ7CLg5a81Ui9TTR35QcR4y7ZyihxwfjqaHfUVcVo",
Expand Down Expand Up @@ -216,15 +193,15 @@
},
{
"author": "whyrusleeping",
"hash": "QmdysBu77i3YaagNtMAjiCJdeWWvds18ho5XEB784guQ41",
"hash": "QmU3pGGVT1riXp5dBJbNrGpxssVScfvk9236drRHZZbKJ1",
"name": "go-libp2p-net",
"version": "1.5.0"
"version": "1.6.0"
},
{
"author": "whyrusleeping",
"hash": "QmVcNzHewFvmVah1CGqg8NV7nHHsPu19U43YE5b2oqWyBp",
"hash": "QmX4j1JhubdEt4EB1JY1mMKTvJwPZSRzTv3uwh5zaDqyAi",
"name": "go-libp2p-metrics",
"version": "1.5.0"
"version": "1.6.0"
},
{
"author": "whyrusleeping",
Expand All @@ -234,15 +211,15 @@
},
{
"author": "whyrusleeping",
"hash": "QmWf338UyG5DKyemvoFiomDPtkVNHLsw3GAt9XXHX5ZtsM",
"hash": "QmU5qKZsCG1Wg38jwg8XezBdc3fBGMMZjM7YFMAhunC1Yh",
"name": "go-libp2p-host",
"version": "1.1.1"
"version": "1.2.0"
},
{
"author": "whyrusleeping",
"hash": "QmcjMKTqrWgMMCExEnwczefhno5fvx7FHDV63peZwDzHNF",
"hash": "QmU9ePpXRQgGpPpMAm1CsgU9KptrtgZERrVBGB7Ek5cM2D",
"name": "go-libp2p-swarm",
"version": "1.3.3"
"version": "1.4.0"
},
{
"author": "whyrusleeping",
Expand Down

0 comments on commit 1c10ed4

Please sign in to comment.