Skip to content

Commit

Permalink
use NodeAddress everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
pm47 committed Mar 15, 2022
1 parent b03d9d1 commit bc6d694
Show file tree
Hide file tree
Showing 23 changed files with 117 additions and 127 deletions.
27 changes: 17 additions & 10 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Client.scala
Expand Up @@ -16,8 +16,6 @@

package fr.acinq.eclair.io

import java.net.InetSocketAddress

import akka.actor.{Props, _}
import akka.event.Logging.MDC
import akka.io.Tcp.SO.KeepAlive
Expand All @@ -28,14 +26,16 @@ import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair.crypto.Noise.KeyPair
import fr.acinq.eclair.tor.Socks5Connection.{Socks5Connect, Socks5Connected, Socks5Error}
import fr.acinq.eclair.tor.{Socks5Connection, Socks5ProxyParams}
import fr.acinq.eclair.wire.protocol._

import java.net.InetSocketAddress
import scala.concurrent.duration._

/**
* Created by PM on 27/10/2015.
*
*/
class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, remoteAddress: InetSocketAddress, remoteNodeId: PublicKey, origin_opt: Option[ActorRef], isPersistent: Boolean) extends Actor with DiagnosticActorLogging {
class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, remoteNodeAddress: NodeAddress, remoteNodeId: PublicKey, origin_opt: Option[ActorRef], isPersistent: Boolean) extends Actor with DiagnosticActorLogging {

import context.system

Expand All @@ -44,7 +44,14 @@ class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams],

def receive: Receive = {
case Symbol("connect") =>
val (peerOrProxyAddress, proxyParams_opt) = socks5ProxyParams_opt.map(proxyParams => (proxyParams, Socks5ProxyParams.proxyAddress(remoteAddress, proxyParams))) match {
// note that there is no resolution here, it's easier plain ip addresses, or unresolved tor hostnames
val remoteAddress = remoteNodeAddress match {
case addr: IPv4 => new InetSocketAddress(addr.ipv4, addr.port)
case addr: IPv6 => new InetSocketAddress(addr.ipv6, addr.port)
case addr: Tor2 => InetSocketAddress.createUnresolved(addr.host, addr.port)
case addr: Tor3 => InetSocketAddress.createUnresolved(addr.host, addr.port)
}
val (peerOrProxyAddress, proxyParams_opt) = socks5ProxyParams_opt.map(proxyParams => (proxyParams, Socks5ProxyParams.proxyAddress(remoteNodeAddress, proxyParams))) match {
case Some((proxyParams, Some(proxyAddress))) =>
log.info(s"connecting to SOCKS5 proxy ${str(proxyAddress)}")
(proxyAddress, Some(proxyParams))
Expand All @@ -53,14 +60,14 @@ class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams],
(remoteAddress, None)
}
IO(Tcp) ! Tcp.Connect(peerOrProxyAddress, timeout = Some(20 seconds), options = KeepAlive(true) :: Nil, pullMode = true)
context become connecting(proxyParams_opt)
context become connecting(proxyParams_opt, remoteAddress)
}

def connecting(proxyParams: Option[Socks5ProxyParams]): Receive = {
def connecting(proxyParams: Option[Socks5ProxyParams], remoteAddress: InetSocketAddress): Receive = {
case Tcp.CommandFailed(c: Tcp.Connect) =>
val peerOrProxyAddress = c.remoteAddress
log.info(s"connection failed to ${str(peerOrProxyAddress)}")
origin_opt.foreach(_ ! PeerConnection.ConnectionResult.ConnectionFailed(remoteAddress))
origin_opt.foreach(_ ! PeerConnection.ConnectionResult.ConnectionFailed(remoteNodeAddress))
context stop self

case Tcp.Connected(peerOrProxyAddress, _) =>
Expand All @@ -75,7 +82,7 @@ class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams],
context become {
case Tcp.CommandFailed(_: Socks5Connect) =>
log.info(s"connection failed to ${str(remoteAddress)} via SOCKS5 ${str(proxyAddress)}")
origin_opt.foreach(_ ! PeerConnection.ConnectionResult.ConnectionFailed(remoteAddress))
origin_opt.foreach(_ ! PeerConnection.ConnectionResult.ConnectionFailed(remoteNodeAddress))
context stop self
case Socks5Connected(_) =>
log.info(s"connected to ${str(remoteAddress)} via SOCKS5 proxy ${str(proxyAddress)}")
Expand Down Expand Up @@ -127,13 +134,13 @@ class Client(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams],
switchboard = switchboard,
router = router
))
peerConnection ! PeerConnection.PendingAuth(connection, remoteNodeId_opt = Some(remoteNodeId), address = remoteAddress, origin_opt = origin_opt, isPersistent = isPersistent)
peerConnection ! PeerConnection.PendingAuth(connection, remoteNodeId_opt = Some(remoteNodeId), address = remoteNodeAddress, origin_opt = origin_opt, isPersistent = isPersistent)
peerConnection
}
}

object Client {

def props(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, address: InetSocketAddress, remoteNodeId: PublicKey, origin_opt: Option[ActorRef], isPersistent: Boolean): Props = Props(new Client(keyPair, socks5ProxyParams_opt, peerConnectionConf, switchboard, router, address, remoteNodeId, origin_opt, isPersistent))
def props(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef, address: NodeAddress, remoteNodeId: PublicKey, origin_opt: Option[ActorRef], isPersistent: Boolean): Props = Props(new Client(keyPair, socks5ProxyParams_opt, peerConnectionConf, switchboard, router, address, remoteNodeId, origin_opt, isPersistent))

}
Expand Up @@ -16,8 +16,6 @@

package fr.acinq.eclair.io

import java.net.InetSocketAddress

import akka.actor.{Actor, ActorLogging, ActorRef, DeadLetter, Props}
import akka.cluster.Cluster
import akka.cluster.pubsub.DistributedPubSub
Expand All @@ -26,6 +24,7 @@ import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.crypto.Noise.KeyPair
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
import fr.acinq.eclair.tor.Socks5ProxyParams
import fr.acinq.eclair.wire.protocol.NodeAddress

class ClientSpawner(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef) extends Actor with ActorLogging {

Expand Down Expand Up @@ -57,16 +56,16 @@ class ClientSpawner(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyP
log.warning("handling outgoing connection request locally")
self forward req
case _: DeadLetter =>
// we don't care about other dead letters
// we don't care about other dead letters
}
}

object ClientSpawner {

def props(keyPair: KeyPair, socks5ProxyParams_opt: Option[Socks5ProxyParams], peerConnectionConf: PeerConnection.Conf, switchboard: ActorRef, router: ActorRef): Props = Props(new ClientSpawner(keyPair, socks5ProxyParams_opt, peerConnectionConf, switchboard, router))

case class ConnectionRequest(address: InetSocketAddress,
remoteNodeId: PublicKey,
case class ConnectionRequest(remoteNodeId: PublicKey,
address: NodeAddress,
origin: ActorRef,
isPersistent: Boolean) extends RemoteTypes
}
7 changes: 4 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/NodeURI.scala
Expand Up @@ -18,11 +18,12 @@ package fr.acinq.eclair.io

import com.google.common.net.HostAndPort
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.wire.protocol.NodeAddress
import scodec.bits.ByteVector

import scala.util.{Failure, Success, Try}

case class NodeURI(nodeId: PublicKey, address: HostAndPort) {
case class NodeURI(nodeId: PublicKey, address: NodeAddress) {
override def toString: String = s"$nodeId@$address"
}

Expand All @@ -40,8 +41,8 @@ object NodeURI {
@throws[IllegalArgumentException]
def parse(uri: String): NodeURI = {
uri.split("@") match {
case Array(nodeId, address) => (Try(PublicKey(ByteVector.fromValidHex(nodeId))), Try(HostAndPort.fromString(address).withDefaultPort(DEFAULT_PORT))) match {
case (Success(pk), Success(hostAndPort)) => NodeURI(pk, hostAndPort)
case Array(nodeId, address) => (Try(PublicKey(ByteVector.fromValidHex(nodeId))), Try(HostAndPort.fromString(address)).flatMap(hostAndPort => NodeAddress.fromParts(hostAndPort.getHost, hostAndPort.getPortOrDefault(DEFAULT_PORT)))) match {
case (Success(pk), Success(nodeAddress)) => NodeURI(pk, nodeAddress)
case (Failure(_), _) => throw new IllegalArgumentException("Invalid node id")
case (_, Failure(_)) => throw new IllegalArgumentException("Invalid host:port")
}
Expand Down
12 changes: 5 additions & 7 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Expand Up @@ -21,7 +21,6 @@ import akka.actor.{Actor, ActorContext, ActorRef, ExtendedActorSystem, FSM, OneF
import akka.event.Logging.MDC
import akka.event.{BusLogging, DiagnosticLoggingAdapter}
import akka.util.Timeout
import com.google.common.net.HostAndPort
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, Satoshi, SatoshiLong, Script}
import fr.acinq.eclair.Features.Wumbo
Expand All @@ -42,7 +41,6 @@ import fr.acinq.eclair.wire.protocol
import fr.acinq.eclair.wire.protocol.{Error, HasChannelId, HasTemporaryChannelId, LightningMessage, NodeAddress, OnionMessage, RoutingMessage, UnknownMessage, Warning}
import scodec.bits.ByteVector

import java.net.InetSocketAddress
import scala.concurrent.ExecutionContext

/**
Expand Down Expand Up @@ -331,12 +329,12 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: OnChainA

def gotoConnected(connectionReady: PeerConnection.ConnectionReady, channels: Map[ChannelId, ActorRef]): State = {
require(remoteNodeId == connectionReady.remoteNodeId, s"invalid nodeid: $remoteNodeId != ${connectionReady.remoteNodeId}")
log.debug("got authenticated connection to address {}:{}", connectionReady.address.getHostString, connectionReady.address.getPort)
log.debug("got authenticated connection to address {}", connectionReady.address)

if (connectionReady.outgoing) {
// we store the node address upon successful outgoing connection, so we can reconnect later
// any previous address is overwritten
NodeAddress.fromParts(connectionReady.address.getHostString, connectionReady.address.getPort).map(nodeAddress => nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, nodeAddress))
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, connectionReady.address)
}

// let's bring existing/requested channels online
Expand Down Expand Up @@ -445,7 +443,7 @@ object Peer {
}
case object Nothing extends Data { override def channels = Map.empty }
case class DisconnectedData(channels: Map[FinalChannelId, ActorRef]) extends Data
case class ConnectedData(address: InetSocketAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef]) extends Data {
case class ConnectedData(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, channels: Map[ChannelId, ActorRef]) extends Data {
val connectionInfo: ConnectionInfo = ConnectionInfo(address, peerConnection, localInit, remoteInit)
def localFeatures: Features[InitFeature] = localInit.features
def remoteFeatures: Features[InitFeature] = remoteInit.features
Expand All @@ -457,7 +455,7 @@ object Peer {
case object CONNECTED extends State

case class Init(storedChannels: Set[HasCommitments])
case class Connect(nodeId: PublicKey, address_opt: Option[HostAndPort], replyTo: ActorRef, isPersistent: Boolean) {
case class Connect(nodeId: PublicKey, address_opt: Option[NodeAddress], replyTo: ActorRef, isPersistent: Boolean) {
def uri: Option[NodeURI] = address_opt.map(NodeURI(nodeId, _))
}
object Connect {
Expand All @@ -476,7 +474,7 @@ object Peer {
sealed trait PeerInfoResponse {
def nodeId: PublicKey
}
case class PeerInfo(peer: ActorRef, nodeId: PublicKey, state: State, address: Option[InetSocketAddress], channels: Int) extends PeerInfoResponse
case class PeerInfo(peer: ActorRef, nodeId: PublicKey, state: State, address: Option[NodeAddress], channels: Int) extends PeerInfoResponse
case class PeerNotFound(nodeId: PublicKey) extends PeerInfoResponse { override def toString: String = s"peer $nodeId not found" }

case class PeerRoutingMessage(peerConnection: ActorRef, remoteNodeId: PublicKey, message: RoutingMessage) extends RemoteTypes
Expand Down
Expand Up @@ -32,7 +32,6 @@ import fr.acinq.eclair.{FSMDiagnosticActorLogging, Feature, Features, InitFeatur
import scodec.Attempt
import scodec.bits.ByteVector

import java.net.InetSocketAddress
import scala.concurrent.duration._
import scala.util.Random

Expand Down Expand Up @@ -87,8 +86,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
when(AUTHENTICATING) {
case Event(TransportHandler.HandshakeCompleted(remoteNodeId), d: AuthenticatingData) =>
cancelTimer(AUTH_TIMER)
import d.pendingAuth.address
log.info(s"connection authenticated with $remoteNodeId@${address.getHostString}:${address.getPort} direction=${if (d.pendingAuth.outgoing) "outgoing" else "incoming"}")
log.info(s"connection authenticated (direction=${if (d.pendingAuth.outgoing) "outgoing" else "incoming"})")
Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Authenticated).increment()
switchboard ! Authenticated(self, remoteNodeId)
goto(BEFORE_INIT) using BeforeInitData(remoteNodeId, d.pendingAuth, d.transport, d.isPersistent)
Expand All @@ -104,8 +102,8 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
d.transport ! TransportHandler.Listener(self)
Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Initializing).increment()
log.info(s"using features=$localFeatures")
val localInit = IPAddress(d.pendingAuth.address.getAddress, d.pendingAuth.address.getPort) match {
case Some(remoteAddress) if !d.pendingAuth.outgoing && NodeAddress.isPublicIPAddress(remoteAddress) => protocol.Init(localFeatures, TlvStream(InitTlv.Networks(chainHash :: Nil), InitTlv.RemoteAddress(remoteAddress)))
val localInit = d.pendingAuth.address match {
case remoteAddress if !d.pendingAuth.outgoing && NodeAddress.isPublicIPAddress(remoteAddress) => protocol.Init(localFeatures, TlvStream(InitTlv.Networks(chainHash :: Nil), InitTlv.RemoteAddress(remoteAddress)))
case _ => protocol.Init(localFeatures, TlvStream(InitTlv.Networks(chainHash :: Nil)))
}
d.transport ! localInit
Expand All @@ -120,7 +118,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
d.transport ! TransportHandler.ReadAck(remoteInit)

log.info(s"peer is using features=${remoteInit.features}, networks=${remoteInit.networks.mkString(",")}")
remoteInit.remoteAddress_opt.foreach(address => log.info("peer reports that our IP address is {} (public={})", address.socketAddress.toString, NodeAddress.isPublicIPAddress(address)))
remoteInit.remoteAddress_opt.foreach(address => log.info("peer reports that our IP address is {} (public={})", address.toString, NodeAddress.isPublicIPAddress(address)))

val featureGraphErr_opt = Features.validateFeatureGraph(remoteInit.features)
if (remoteInit.networks.nonEmpty && remoteInit.networks.intersect(d.localInit.networks).isEmpty) {
Expand Down Expand Up @@ -552,12 +550,12 @@ object PeerConnection {
case object INITIALIZING extends State
case object CONNECTED extends State

case class PendingAuth(connection: ActorRef, remoteNodeId_opt: Option[PublicKey], address: InetSocketAddress, origin_opt: Option[ActorRef], transport_opt: Option[ActorRef] = None, isPersistent: Boolean) {
case class PendingAuth(connection: ActorRef, remoteNodeId_opt: Option[PublicKey], address: NodeAddress, origin_opt: Option[ActorRef], transport_opt: Option[ActorRef] = None, isPersistent: Boolean) {
def outgoing: Boolean = remoteNodeId_opt.isDefined // if this is an outgoing connection, we know the node id in advance
}
case class Authenticated(peerConnection: ActorRef, remoteNodeId: PublicKey) extends RemoteTypes
case class InitializeConnection(peer: ActorRef, chainHash: ByteVector32, features: Features[InitFeature], doSync: Boolean) extends RemoteTypes
case class ConnectionReady(peerConnection: ActorRef, remoteNodeId: PublicKey, address: InetSocketAddress, outgoing: Boolean, localInit: protocol.Init, remoteInit: protocol.Init) extends RemoteTypes
case class ConnectionReady(peerConnection: ActorRef, remoteNodeId: PublicKey, address: NodeAddress, outgoing: Boolean, localInit: protocol.Init, remoteInit: protocol.Init) extends RemoteTypes

sealed trait ConnectionResult extends RemoteTypes
object ConnectionResult {
Expand All @@ -569,7 +567,7 @@ object PeerConnection {
}

case object NoAddressFound extends ConnectionResult.Failure { override def toString: String = "no address found" }
case class ConnectionFailed(address: InetSocketAddress) extends ConnectionResult.Failure { override def toString: String = s"connection failed to $address" }
case class ConnectionFailed(address: NodeAddress) extends ConnectionResult.Failure { override def toString: String = s"connection failed to $address" }
case class AuthenticationFailed(reason: String) extends ConnectionResult.Failure { override def toString: String = reason }
case class InitializationFailed(reason: String) extends ConnectionResult.Failure { override def toString: String = reason }
case class AlreadyConnected(peerConnection: ActorRef, peer: ActorRef) extends ConnectionResult.Failure with HasConnection { override def toString: String = "already connected" }
Expand Down
Expand Up @@ -19,13 +19,11 @@ package fr.acinq.eclair.io
import akka.actor.ActorRef
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.wire.protocol
import fr.acinq.eclair.wire.protocol.UnknownMessage

import java.net.InetSocketAddress
import fr.acinq.eclair.wire.protocol.{NodeAddress, UnknownMessage}

sealed trait PeerEvent

case class ConnectionInfo(address: InetSocketAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init)
case class ConnectionInfo(address: NodeAddress, peerConnection: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init)

case class PeerConnected(peer: ActorRef, nodeId: PublicKey, connectionInfo: ConnectionInfo) extends PeerEvent

Expand Down

0 comments on commit bc6d694

Please sign in to comment.