Skip to content

Commit

Permalink
Network Resource Manager interface (#229)
Browse files Browse the repository at this point in the history
* add resource manager interfaces

* add scope accessors to streams and conns

* add ResourceManager accessor to the Network interface

* allow initially unattached streams.

* introduce service scopes, canonicalize ownership interface through setters

* make system scope explicit

* make memory stat an int64

* make the system scope a generic resource scope, introduce the DMZ

* fix typo

Co-authored-by: Marten Seemann <martenseemann@gmail.com>

* fix typo

Co-authored-by: Marten Seemann <martenseemann@gmail.com>

* rename DMZ to transient scope, remove OpenConnection from PeerScope

* remove ncopy param from GrowBuffer

* remove protocols from OpenStream

The stream is unnegotiated state until the actual protocol has been determined.

* document nil receiver contract, fix protocol scope protocol accessor method

* remove nil receiver contract requirement

* flesh out stat struct

* turn resource manager scope accessors into viewers

* interface refiniments

1. Introduce transactions in all scopes
2. Limit the view of stream/connection scope for users, to avoid the Done footgun
3. Move OpenStream to the resource manager

* add Buffer interface

* fix typo

Co-authored-by: Marten Seemann <martenseemann@gmail.com>

* fix typo

Co-authored-by: Marten Seemann <martenseemann@gmail.com>

* fix typo

Co-authored-by: Marten Seemann <martenseemann@gmail.com>

* rename user scopes to plain names, management scopes as such

* rename BeginTxn to BeginTransaction

* RIP Buffers

* make ErrResourceLimitExceeded a temporary error; move rcmgr errors with the other errors.

* unexport TemporaryError

* null resource manager stub

* unexport the null stubs, make entry point a variable

* don't rely on typed nils but instead use actual null object instances

So that we don't confuse the hell out of everyone!

* add Scope to the CapableConn interface

* rename ConnectionScope to ConnScope for consistency

* fix typo

* rename ConnectionManagementScope to ConnManagementScope

* add the ConnManagementScope to Upgrader.Upgrade

* fix argument name

* godocs for ResourceManager

* introduce MemoryStatus indicator in ReserveMemory

* use uint8 for MemoryStatus

Co-authored-by: Marten Seemann <martenseemann@gmail.com>

* rework reservation interface to pass priority instead of returning memory status

so that we don't have to undo reservations if there is too much pressure.

* improve comment

* fix typo

* export the NullScope

* Stream.SetProtocol can return an error

It needs to attach the protocol to the protocol scope, which may fail.

* merge the mux package into network

* pass the PeerScope to Multiplexer.NetConn

* Update network/rcmgr.go

Co-authored-by: raulk <raul@protocol.ai>

* Update network/rcmgr.go

Co-authored-by: raulk <raul@protocol.ai>

* Update network/rcmgr.go

Co-authored-by: raulk <raul@protocol.ai>

* Update network/rcmgr.go

Co-authored-by: Adin Schmahmann <adin.schmahmann@gmail.com>

* remove reference to deprecated mux.MuxedConn

* rename transaction to span

* indicate bytes in ReserveMemory

* break ResourceManager View methods into Viewer interface.

* add experimental interface warning

Co-authored-by: Marten Seemann <martenseemann@gmail.com>
Co-authored-by: raulk <raul@protocol.ai>
Co-authored-by: Adin Schmahmann <adin.schmahmann@gmail.com>
  • Loading branch information
4 people committed Jan 17, 2022
1 parent ecd8d78 commit feb4853
Show file tree
Hide file tree
Showing 8 changed files with 469 additions and 98 deletions.
100 changes: 9 additions & 91 deletions core/mux/mux.go
Expand Up @@ -5,99 +5,17 @@
package mux

import (
"context"
"errors"
"io"
"net"
"time"
"github.com/libp2p/go-libp2p-core/network"
)

// ErrReset is returned when reading or writing on a reset stream.
var ErrReset = errors.New("stream reset")
// Deprecated: use network.ErrReset instead.
var ErrReset = network.ErrReset

// Stream is a bidirectional io pipe within a connection.
type MuxedStream interface {
io.Reader
io.Writer
// Deprecated: use network.MuxedStream instead.
type MuxedStream = network.MuxedStream

// Close closes the stream.
//
// * Any buffered data for writing will be flushed.
// * Future reads will fail.
// * Any in-progress reads/writes will be interrupted.
//
// Close may be asynchronous and _does not_ guarantee receipt of the
// data.
//
// Close closes the stream for both reading and writing.
// Close is equivalent to calling `CloseRead` and `CloseWrite`. Importantly, Close will not wait for any form of acknowledgment.
// If acknowledgment is required, the caller must call `CloseWrite`, then wait on the stream for a response (or an EOF),
// then call Close() to free the stream object.
//
// When done with a stream, the user must call either Close() or `Reset()` to discard the stream, even after calling `CloseRead` and/or `CloseWrite`.
io.Closer
// Deprecated: use network.MuxedConn instead.
type MuxedConn = network.MuxedConn

// CloseWrite closes the stream for writing but leaves it open for
// reading.
//
// CloseWrite does not free the stream, users must still call Close or
// Reset.
CloseWrite() error

// CloseRead closes the stream for reading but leaves it open for
// writing.
//
// When CloseRead is called, all in-progress Read calls are interrupted with a non-EOF error and
// no further calls to Read will succeed.
//
// The handling of new incoming data on the stream after calling this function is implementation defined.
//
// CloseRead does not free the stream, users must still call Close or
// Reset.
CloseRead() error

// Reset closes both ends of the stream. Use this to tell the remote
// side to hang up and go away.
Reset() error

SetDeadline(time.Time) error
SetReadDeadline(time.Time) error
SetWriteDeadline(time.Time) error
}

// NoopHandler do nothing. Resets streams as soon as they are opened.
var NoopHandler = func(s MuxedStream) { s.Reset() }

// MuxedConn represents a connection to a remote peer that has been
// extended to support stream multiplexing.
//
// A MuxedConn allows a single net.Conn connection to carry many logically
// independent bidirectional streams of binary data.
//
// Together with network.ConnSecurity, MuxedConn is a component of the
// transport.CapableConn interface, which represents a "raw" network
// connection that has been "upgraded" to support the libp2p capabilities
// of secure communication and stream multiplexing.
type MuxedConn interface {
// Close closes the stream muxer and the the underlying net.Conn.
io.Closer

// IsClosed returns whether a connection is fully closed, so it can
// be garbage collected.
IsClosed() bool

// OpenStream creates a new stream.
OpenStream(context.Context) (MuxedStream, error)

// AcceptStream accepts a stream opened by the other side.
AcceptStream() (MuxedStream, error)
}

// Multiplexer wraps a net.Conn with a stream multiplexing
// implementation and returns a MuxedConn that supports opening
// multiple streams over the underlying net.Conn
type Multiplexer interface {

// NewConn constructs a new connection
NewConn(c net.Conn, isServer bool) (MuxedConn, error)
}
// Deprecated: use network.Multiplexer instead.
type Multiplexer = network.Multiplexer
8 changes: 8 additions & 0 deletions core/network/conn.go
Expand Up @@ -20,6 +20,7 @@ type Conn interface {
ConnSecurity
ConnMultiaddrs
ConnStat
ConnScoper

// ID returns an identifier that uniquely identifies this Conn within this
// host, during this run. Connection IDs may repeat across restarts.
Expand Down Expand Up @@ -65,3 +66,10 @@ type ConnStat interface {
// Stat stores metadata pertaining to this conn.
Stat() ConnStats
}

// ConnScoper is the interface that one can mix into a connection interface to give it a resource
// management scope
type ConnScoper interface {
// Scope returns the user view of this connection's resource scope
Scope() ConnScope
}
21 changes: 20 additions & 1 deletion core/network/errors.go
@@ -1,6 +1,17 @@
package network

import "errors"
import (
"errors"
"net"
)

type temporaryError string

func (e temporaryError) Error() string { return string(e) }
func (e temporaryError) Temporary() bool { return true }
func (e temporaryError) Timeout() bool { return false }

var _ net.Error = temporaryError("")

// ErrNoRemoteAddrs is returned when there are no addresses associated with a peer during a dial.
var ErrNoRemoteAddrs = errors.New("no remote addresses")
Expand All @@ -12,3 +23,11 @@ var ErrNoConn = errors.New("no usable connection to peer")
// ErrTransientConn is returned when attempting to open a stream to a peer with only a transient
// connection, without specifying the UseTransient option.
var ErrTransientConn = errors.New("transient connection to peer")

// ErrResourceLimitExceeded is returned when attempting to perform an operation that would
// exceed system resource limits.
var ErrResourceLimitExceeded = temporaryError("resource limit exceeded")

// ErrResourceScopeClosed is returned when attemptig to reserve resources in a closed resource
// scope.
var ErrResourceScopeClosed = errors.New("resource scope closed")
95 changes: 95 additions & 0 deletions core/network/mux.go
@@ -0,0 +1,95 @@
package network

import (
"context"
"errors"
"io"
"net"
"time"
)

// ErrReset is returned when reading or writing on a reset stream.
var ErrReset = errors.New("stream reset")

// MuxedStream is a bidirectional io pipe within a connection.
type MuxedStream interface {
io.Reader
io.Writer

// Close closes the stream.
//
// * Any buffered data for writing will be flushed.
// * Future reads will fail.
// * Any in-progress reads/writes will be interrupted.
//
// Close may be asynchronous and _does not_ guarantee receipt of the
// data.
//
// Close closes the stream for both reading and writing.
// Close is equivalent to calling `CloseRead` and `CloseWrite`. Importantly, Close will not wait for any form of acknowledgment.
// If acknowledgment is required, the caller must call `CloseWrite`, then wait on the stream for a response (or an EOF),
// then call Close() to free the stream object.
//
// When done with a stream, the user must call either Close() or `Reset()` to discard the stream, even after calling `CloseRead` and/or `CloseWrite`.
io.Closer

// CloseWrite closes the stream for writing but leaves it open for
// reading.
//
// CloseWrite does not free the stream, users must still call Close or
// Reset.
CloseWrite() error

// CloseRead closes the stream for reading but leaves it open for
// writing.
//
// When CloseRead is called, all in-progress Read calls are interrupted with a non-EOF error and
// no further calls to Read will succeed.
//
// The handling of new incoming data on the stream after calling this function is implementation defined.
//
// CloseRead does not free the stream, users must still call Close or
// Reset.
CloseRead() error

// Reset closes both ends of the stream. Use this to tell the remote
// side to hang up and go away.
Reset() error

SetDeadline(time.Time) error
SetReadDeadline(time.Time) error
SetWriteDeadline(time.Time) error
}

// MuxedConn represents a connection to a remote peer that has been
// extended to support stream multiplexing.
//
// A MuxedConn allows a single net.Conn connection to carry many logically
// independent bidirectional streams of binary data.
//
// Together with network.ConnSecurity, MuxedConn is a component of the
// transport.CapableConn interface, which represents a "raw" network
// connection that has been "upgraded" to support the libp2p capabilities
// of secure communication and stream multiplexing.
type MuxedConn interface {
// Close closes the stream muxer and the the underlying net.Conn.
io.Closer

// IsClosed returns whether a connection is fully closed, so it can
// be garbage collected.
IsClosed() bool

// OpenStream creates a new stream.
OpenStream(context.Context) (MuxedStream, error)

// AcceptStream accepts a stream opened by the other side.
AcceptStream() (MuxedStream, error)
}

// Multiplexer wraps a net.Conn with a stream multiplexing
// implementation and returns a MuxedConn that supports opening
// multiple streams over the underlying net.Conn
type Multiplexer interface {
// NewConn constructs a new connection
NewConn(c net.Conn, isServer bool, scope PeerScope) (MuxedConn, error)
}
3 changes: 3 additions & 0 deletions core/network/network.go
Expand Up @@ -145,6 +145,9 @@ type Network interface {
// listens. It expands "any interface" addresses (/ip4/0.0.0.0, /ip6/::) to
// use the known local interfaces.
InterfaceListenAddresses() ([]ma.Multiaddr, error)

// ResourceManager returns the ResourceManager associated with this network
ResourceManager() ResourceManager
}

// Dialer represents a service that can dial out to peers
Expand Down

0 comments on commit feb4853

Please sign in to comment.