Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: mark connections with limits as transient #1890

Merged
merged 9 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/interface-compliance-tests/src/mocks/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class MockConnection implements Connection {
public status: ConnectionStatus
public streams: Stream[]
public tags: string[]
public limited: boolean

private readonly muxer: StreamMuxer
private readonly maConn: MultiaddrConnection
Expand All @@ -63,6 +64,7 @@ class MockConnection implements Connection {
this.tags = []
this.muxer = muxer
this.maConn = maConn
this.limited = false
}

async newStream (protocols: string | string[], options?: AbortOptions): Promise<Stream> {
Expand Down
2 changes: 1 addition & 1 deletion packages/interface-compliance-tests/src/mocks/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { mockConnection } from './connection.js'
import type { Libp2pEvents } from '@libp2p/interface'
import type { Connection, MultiaddrConnection } from '@libp2p/interface/connection'
import type { EventEmitter } from '@libp2p/interface/events'
import type { Upgrader, UpgraderOptions } from '@libp2p/interface/transport'
import type { Registrar } from '@libp2p/interface-internal/registrar'
import type { Upgrader, UpgraderOptions } from '@libp2p/interface-internal/upgrader'

export interface MockUpgraderInit {
registrar?: Registrar
Expand Down
7 changes: 7 additions & 0 deletions packages/interface-internal/src/registrar/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ export interface StreamHandlerOptions {
* How many outgoing streams can be open for this protocol at the same time on each connection (default: 64)
*/
maxOutboundStreams?: number

/**
* If true, allow this protocol to run on limited connections (e.g.
* connections with data or duration limits such as circuit relay
* connections) (default: false)
*/
allowOnLimitedConnection?: boolean
}

export interface StreamHandlerRecord {
Expand Down
20 changes: 0 additions & 20 deletions packages/interface-internal/src/upgrader/index.ts

This file was deleted.

14 changes: 14 additions & 0 deletions packages/interface/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ export interface NewStreamOptions extends AbortOptions {
* for the protocol
*/
maxOutboundStreams?: number

/**
* Opt-in to running over a limited connection - one that has time/data limits
* placed on it.
*/
allowOnLimitedConnection?: boolean
achingbrain marked this conversation as resolved.
Show resolved Hide resolved
}

export type ConnectionStatus = 'open' | 'closing' | 'closed'
Expand Down Expand Up @@ -239,6 +245,14 @@ export interface Connection {
*/
status: ConnectionStatus

/**
* A limited connection is one that is not expected to be open for very long
* or one that cannot transfer very much data, such as one being used as a
* circuit relay connection. Protocols need to explicitly opt-in to being run
* over limited connections.
*/
limited: boolean

/**
* Create a new stream on this connection and negotiate one of the passed protocols
*/
Expand Down
6 changes: 6 additions & 0 deletions packages/interface/src/stream-handler/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ export interface StreamHandlerOptions {
* How many outgoing streams can be open for this protocol at the same time on each connection (default: 64)
*/
maxOutboundStreams?: number

/**
* Opt-in to running over a limited connection - one that has time/data limits
* placed on it.
*/
allowOnLimitedConnection?: boolean
}

export interface StreamHandlerRecord {
Expand Down
6 changes: 6 additions & 0 deletions packages/interface/src/transport/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ export interface UpgraderOptions {
skipEncryption?: boolean
skipProtection?: boolean
muxerFactory?: StreamMuxerFactory

/**
* The passed MultiaddrConnection has limits place on duration and/or data
* transfer amounts so is not expected to be open for very long.
*/
limited?: boolean
}

export interface Upgrader {
Expand Down
12 changes: 8 additions & 4 deletions packages/libp2p/src/circuit-relay/transport/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,10 @@ class CircuitRelayTransport implements Transport {
localAddr: relayAddr.encapsulate(`/p2p-circuit/p2p/${this.peerId.toString()}`)
})

log('new outbound connection %a', maConn.remoteAddr)
return await this.upgrader.upgradeOutbound(maConn)
log('new outbound transient connection %a', maConn.remoteAddr)
return await this.upgrader.upgradeOutbound(maConn, {
limited: true
})
} catch (err) {
log.error(`Circuit relay dial to destination ${destinationPeer.toString()} via relay ${connection.remotePeer.toString()} failed`, err)
disconnectOnFailure && await connection.close()
Expand Down Expand Up @@ -380,8 +382,10 @@ class CircuitRelayTransport implements Transport {
localAddr
})

log('new inbound connection %s', maConn.remoteAddr)
await this.upgrader.upgradeInbound(maConn)
log('new inbound transient connection %s', maConn.remoteAddr)
achingbrain marked this conversation as resolved.
Show resolved Hide resolved
await this.upgrader.upgradeInbound(maConn, {
limited: true
})
log('%s connection %a upgraded', 'inbound', maConn.remoteAddr)
}
}
Expand Down
14 changes: 11 additions & 3 deletions packages/libp2p/src/connection/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { setMaxListeners } from 'events'
import { type Direction, symbol, type Connection, type Stream, type ConnectionTimeline, type ConnectionStatus } from '@libp2p/interface/connection'
import { symbol } from '@libp2p/interface/connection'
import { CodeError } from '@libp2p/interface/errors'
import { logger } from '@libp2p/logger'
import type { AbortOptions } from '@libp2p/interface'
import type { Direction, Connection, Stream, ConnectionTimeline, ConnectionStatus, NewStreamOptions } from '@libp2p/interface/connection'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { Multiaddr } from '@multiformats/multiaddr'

Expand All @@ -22,6 +23,7 @@ interface ConnectionInit {
timeline: ConnectionTimeline
multiplexer?: string
encryption?: string
limited?: boolean
}

/**
Expand Down Expand Up @@ -49,6 +51,7 @@ export class ConnectionImpl implements Connection {
public multiplexer?: string
public encryption?: string
public status: ConnectionStatus
public limited: boolean

/**
* User provided tags
Expand All @@ -59,7 +62,7 @@ export class ConnectionImpl implements Connection {
/**
* Reference to the new stream function of the multiplexer
*/
private readonly _newStream: (protocols: string[], options?: AbortOptions) => Promise<Stream>
private readonly _newStream: (protocols: string[], options?: NewStreamOptions) => Promise<Stream>

/**
* Reference to the close function of the raw connection
Expand Down Expand Up @@ -88,6 +91,7 @@ export class ConnectionImpl implements Connection {
this.timeline = init.timeline
this.multiplexer = init.multiplexer
this.encryption = init.encryption
this.limited = init.limited ?? false

this._newStream = newStream
this._close = close
Expand All @@ -110,7 +114,7 @@ export class ConnectionImpl implements Connection {
/**
* Create a new stream from this connection
*/
async newStream (protocols: string | string[], options?: AbortOptions): Promise<Stream> {
async newStream (protocols: string | string[], options?: NewStreamOptions): Promise<Stream> {
if (this.status === 'closing') {
throw new CodeError('the connection is being closed', 'ERR_CONNECTION_BEING_CLOSED')
}
Expand All @@ -123,6 +127,10 @@ export class ConnectionImpl implements Connection {
protocols = [protocols]
}

if (this.limited && options?.allowOnLimitedConnection !== true) {
throw new CodeError('Cannot open protocol stream on transient connection', 'ERR_TRANSIENT_CONNECTION')
}

const stream = await this._newStream(protocols, options)

stream.direction = 'outbound'
Expand Down
4 changes: 2 additions & 2 deletions packages/libp2p/src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import { DefaultUpgrader } from './upgrader.js'
import type { Components } from './components.js'
import type { Libp2p, Libp2pInit, Libp2pOptions } from './index.js'
import type { Libp2pEvents, PendingDial, ServiceMap, AbortOptions } from '@libp2p/interface'
import type { Connection, Stream } from '@libp2p/interface/connection'
import type { Connection, NewStreamOptions, Stream } from '@libp2p/interface/connection'
import type { KeyChain } from '@libp2p/interface/keychain'
import type { Metrics } from '@libp2p/interface/metrics'
import type { PeerId } from '@libp2p/interface/peer-id'
Expand Down Expand Up @@ -283,7 +283,7 @@ export class Libp2pNode<T extends ServiceMap = Record<string, unknown>> extends
return this.components.connectionManager.openConnection(peer, options)
}

async dialProtocol (peer: PeerId | Multiaddr | Multiaddr[], protocols: string | string[], options: AbortOptions = {}): Promise<Stream> {
async dialProtocol (peer: PeerId | Multiaddr | Multiaddr[], protocols: string | string[], options: NewStreamOptions = {}): Promise<Stream> {
if (protocols == null) {
throw new CodeError('no protocols were provided to open a stream', codes.ERR_INVALID_PROTOCOLS_FOR_STREAM)
}
Expand Down
19 changes: 14 additions & 5 deletions packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import type { Metrics } from '@libp2p/interface/metrics'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { PeerStore } from '@libp2p/interface/peer-store'
import type { StreamMuxer, StreamMuxerFactory } from '@libp2p/interface/stream-muxer'
import type { Upgrader, UpgraderOptions } from '@libp2p/interface/transport'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
import type { Registrar } from '@libp2p/interface-internal/registrar'
import type { Upgrader, UpgraderOptions } from '@libp2p/interface-internal/upgrader'
import type { Duplex, Source } from 'it-stream-types'

const log = logger('libp2p:upgrader')
Expand All @@ -32,6 +32,7 @@ interface CreateConnectionOptions {
upgradedConn: Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>>
remotePeer: PeerId
muxerFactory?: StreamMuxerFactory
limited?: boolean
}

interface OnStreamOptions {
Expand Down Expand Up @@ -251,7 +252,8 @@ export class DefaultUpgrader implements Upgrader {
maConn,
upgradedConn,
muxerFactory,
remotePeer
remotePeer,
limited: opts?.limited
})
} finally {
this.components.connectionManager.afterUpgradeInbound()
Expand Down Expand Up @@ -348,7 +350,8 @@ export class DefaultUpgrader implements Upgrader {
maConn,
upgradedConn,
muxerFactory,
remotePeer
remotePeer,
limited: opts?.limited
})
}

Expand All @@ -362,7 +365,8 @@ export class DefaultUpgrader implements Upgrader {
maConn,
upgradedConn,
remotePeer,
muxerFactory
muxerFactory,
limited
} = opts

let muxer: StreamMuxer | undefined
Expand Down Expand Up @@ -541,6 +545,7 @@ export class DefaultUpgrader implements Upgrader {
timeline: maConn.timeline,
multiplexer: muxer?.protocol,
encryption: cryptoProtocol,
limited,
newStream: newStream ?? errConnectionNotMultiplexed,
getStreams: () => { if (muxer != null) { return muxer.streams } else { return [] } },
close: async (options?: AbortOptions) => {
Expand Down Expand Up @@ -571,7 +576,11 @@ export class DefaultUpgrader implements Upgrader {
*/
_onStream (opts: OnStreamOptions): void {
const { connection, stream, protocol } = opts
const { handler } = this.components.registrar.getHandler(protocol)
const { handler, options } = this.components.registrar.getHandler(protocol)

if (connection.limited && options.allowOnLimitedConnection !== true) {
throw new CodeError('Cannot open protocol stream on transient connection', 'ERR_TRANSIENT_CONNECTION')
}

handler({ connection, stream })
}
Expand Down
88 changes: 88 additions & 0 deletions packages/libp2p/test/circuit-relay/relay.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { Circuit } from '@multiformats/mafmt'
import { multiaddr } from '@multiformats/multiaddr'
import { expect } from 'aegir/chai'
import delay from 'delay'
import { pipe } from 'it-pipe'
import { pbStream } from 'it-protobuf-stream'
import defer from 'p-defer'
import pWaitFor from 'p-wait-for'
Expand Down Expand Up @@ -671,6 +672,93 @@ describe('circuit-relay', () => {
expect(events[0].detail.remotePeer.toString()).to.equal(remote.peerId.toString())
expect(events[1].detail.remotePeer.toString()).to.equal(relay1.peerId.toString())
})

it('should mark a relayed connection as limited', async () => {
// discover relay and make reservation
const connectionToRelay = await remote.dial(relay1.getMultiaddrs()[0])

// connection to relay should not be marked limited
expect(connectionToRelay).to.have.property('limited', false)

await usingAsRelay(remote, relay1)

// dial the remote through the relay
const ma = getRelayAddress(remote)
const connection = await local.dial(ma)

// connection to remote through relay should be marked limited
expect(connection).to.have.property('limited', true)
})

it('should not open streams on a limited connection', async () => {
// discover relay and make reservation
await remote.dial(relay1.getMultiaddrs()[0])
await usingAsRelay(remote, relay1)

// dial the remote through the relay
const ma = getRelayAddress(remote)
const connection = await local.dial(ma)

// connection should be marked limited
expect(connection).to.have.property('limited', true)

await expect(connection.newStream('/my-protocol/1.0.0'))
.to.eventually.be.rejected.with.property('code', 'ERR_TRANSIENT_CONNECTION')
})

it('should not allow incoming streams on a limited connection', async () => {
const protocol = '/my-protocol/1.0.0'

// remote registers handler, disallow running over limited streams
await remote.handle(protocol, ({ stream }) => {
void pipe(stream, stream)
}, {
allowOnLimitedConnection: false
})

// discover relay and make reservation
await remote.dial(relay1.getMultiaddrs()[0])
await usingAsRelay(remote, relay1)

// dial the remote through the relay
const ma = getRelayAddress(remote)
const connection = await local.dial(ma)

// connection should be marked limited
expect(connection).to.have.property('limited', true)

await expect(connection.newStream('/my-protocol/1.0.0', {
allowOnLimitedConnection: false
}))
.to.eventually.be.rejected.with.property('code', 'ERR_TRANSIENT_CONNECTION')
})

it('should open streams on a limited connection when told to do so', async () => {
const protocol = '/my-protocol/1.0.0'

// remote registers handler, allow running over limited streams
await remote.handle(protocol, ({ stream }) => {
void pipe(stream, stream)
}, {
allowOnLimitedConnection: true
})

// discover relay and make reservation
await remote.dial(relay1.getMultiaddrs()[0])
await usingAsRelay(remote, relay1)

// dial the remote through the relay
const ma = getRelayAddress(remote)
const connection = await local.dial(ma)

// connection should be marked limited
expect(connection).to.have.property('limited', true)

await expect(connection.newStream('/my-protocol/1.0.0', {
allowOnLimitedConnection: true
}))
.to.eventually.be.ok()
})
})

describe('flows with data limit', () => {
Expand Down