From 02a0678cc69dad436d4dd9f9c7101926ece0e6ec Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Tue, 24 May 2022 14:29:27 -0700 Subject: [PATCH] feat: add tls termination support to proxy (#428) Adds TLS support to proxy. Currently this is implemented by terminating TLS at the proxy and using plaintext to the backends. --- .github/actions/ca/action.yml | 18 ++++ .github/actions/pingproxy/action.yml | 44 ++++++++++ .github/actions/pingserver/action.yml | 50 +++++++++++ .github/actions/rpc-perf/action.yml | 91 ++++++++++++++++++++ .github/workflows/cargo.yml | 106 +++++++++++++++++++++++ src/rust/core/proxy/src/admin.rs | 46 +--------- src/rust/core/proxy/src/listener.rs | 116 ++++++++++++++++++++++---- src/rust/core/proxy/src/poll.rs | 97 +++++++++++++++++---- src/rust/core/proxy/src/process.rs | 6 +- 9 files changed, 498 insertions(+), 76 deletions(-) create mode 100644 .github/actions/ca/action.yml create mode 100644 .github/actions/pingproxy/action.yml create mode 100644 .github/actions/pingserver/action.yml create mode 100644 .github/actions/rpc-perf/action.yml diff --git a/.github/actions/ca/action.yml b/.github/actions/ca/action.yml new file mode 100644 index 000000000..8afcf5aa7 --- /dev/null +++ b/.github/actions/ca/action.yml @@ -0,0 +1,18 @@ +name: 'Configure and Run Smallstep CA' +description: 'Configure and Run Smallstep CA' +runs: + using: "composite" + steps: + - name: Configure and run + run: | + curl -O -L https://dl.step.sm/gh-release/cli/docs-ca-install/v0.19.0/step-cli_0.19.0_amd64.deb + sudo dpkg -i step-cli_0.19.0_amd64.deb + curl -O -L https://dl.step.sm/gh-release/certificates/docs-ca-install/v0.19.0/step-ca_0.19.0_amd64.deb + sudo dpkg -i step-ca_0.19.0_amd64.deb + mkdir ${HOME}/.step + echo password > ${HOME}/.step/password + step ca init --deployment-type=standalone --name=127.0.0.1 --dns=127.0.0.1 --address=127.0.0.1:443 --provisioner=ci@github.com --password-file=${HOME}/.step/password + sudo step-ca --password-file=${HOME}/.step/password ${HOME}/.step/config/ca.json & + sleep 10 + step ca root --ca-url=127.0.0.1:443 root.crt + shell: bash diff --git a/.github/actions/pingproxy/action.yml b/.github/actions/pingproxy/action.yml new file mode 100644 index 000000000..071dffeb8 --- /dev/null +++ b/.github/actions/pingproxy/action.yml @@ -0,0 +1,44 @@ +name: 'Configure and run pingproxy' +description: 'Configures and runs the pingproxy' +inputs: + tls: + description: 'Enable TLS' + required: false + default: false +runs: + using: "composite" + steps: + - name: Configure + run: echo "$BASE_CONFIG" >> proxy.toml + env: + BASE_CONFIG: | + [admin] + port = "9997" + + [backend] + endpoints = ["127.0.0.1:12321"] + shell: bash + - name: Generate TLS Key/Cert + run: | + if ${{ inputs.tls }}; then + step ca certificate --san=127.0.0.1 --ca-url=127.0.0.1:443 --provisioner-password-file=${HOME}/.step/password localhost proxy.crt proxy.key + fi + shell: bash + - name: Configure TLS + run: if ${{ inputs.tls }}; then echo "$TLS_CONFIG" >> proxy.toml; fi + env: + TLS_CONFIG: | + [tls] + certificate_chain = "root.crt" + certificate = "proxy.crt" + private_key = "proxy.key" + shell: bash + - name: Build pingproxy + run: | + cd pelikan && cargo build --release --bin pelikan_pingproxy_rs + shell: bash + - name: Run pingproxy + run: | + ./pelikan/target/release/pelikan_pingproxy_rs proxy.toml & + sleep 10 + shell: bash diff --git a/.github/actions/pingserver/action.yml b/.github/actions/pingserver/action.yml new file mode 100644 index 000000000..420eb2444 --- /dev/null +++ b/.github/actions/pingserver/action.yml @@ -0,0 +1,50 @@ +name: 'Configure and run pingserver' +description: 'Configures and runs the pingserver' +env: + BASE_CONFIG: | + [admin] + port = "9999" + TLS_CONFIG: | + [tls] + certificate_chain = "root.crt" + certificate = "server.crt" + private_key = "server.key" +inputs: + tls: + description: 'Enable TLS' + required: false + default: false +runs: + using: "composite" + steps: + - name: Configure + run: echo "$BASE_CONFIG" >> server.toml + env: + BASE_CONFIG: | + [admin] + port = "9999" + shell: bash + - name: Generate TLS Key/Cert + run: | + if ${{ inputs.tls }}; then + step ca certificate --san=127.0.0.1 --ca-url=127.0.0.1:443 --provisioner-password-file=${HOME}/.step/password localhost server.crt server.key + fi + shell: bash + - name: Configure TLS + run: if ${{ inputs.tls }}; then echo "$TLS_CONFIG" >> server.toml; fi + env: + TLS_CONFIG: | + [tls] + certificate_chain = "root.crt" + certificate = "server.crt" + private_key = "server.key" + shell: bash + - name: Build pingserver + run: | + cd pelikan && cargo build --release --bin pelikan_pingserver_rs + shell: bash + - name: Run pingserver + run: | + ./pelikan/target/release/pelikan_pingserver_rs server.toml & + sleep 10 + shell: bash diff --git a/.github/actions/rpc-perf/action.yml b/.github/actions/rpc-perf/action.yml new file mode 100644 index 000000000..06bf2e48d --- /dev/null +++ b/.github/actions/rpc-perf/action.yml @@ -0,0 +1,91 @@ +name: 'Run rpc-perf' +description: 'Builds, configures, and runs rpc-perf' +inputs: + port: + description: 'Port number for endpoint' + required: true + default: '12321' + protocol: + description: 'Name of the protocol' + required: true + default: 'memcache' + tls: + description: 'Enable TLS connections to endpoint' + required: false + default: false +runs: + using: "composite" + steps: + - name: Checkout rpc-perf + uses: actions/checkout@v2 + with: + repository: twitter/rpc-perf + path: rpc-perf + - name: Build Cache for rpc-perf + uses: Swatinem/rust-cache@v1 + with: + key: rpc-perf + working-directory: rpc-perf + - name: Build rpc-perf + run: cd rpc-perf && cargo build --release + shell: bash + - name: Configure + run: echo "$BASE_CONFIG" >> client.toml + env: + BASE_CONFIG: | + [general] + protocol = "${{ inputs.protocol }}" + threads = 1 + + [target] + endpoints = ["127.0.0.1:${{ inputs.port }}"] + + [connection] + poolsize = 20 + + [request] + ratelimit = 1000 + shell: bash + - name: Generate TLS Key/Cert + run: | + if ${{ inputs.tls }}; then + step ca certificate --san=127.0.0.1 --ca-url=127.0.0.1:443 --provisioner-password-file=${HOME}/.step/password localhost client.crt client.key + fi + shell: bash + - name: Configure Workload + run: | + if [ ${{ inputs.protocol }} == "memcache" ]; then + echo "$MEMCACHE_WORKLOAD" >> client.toml + elif [ ${{ inputs.protocol }} == "ping" ]; then + echo "$PING_WORKLOAD" >> client.toml + fi + env: + MEMCACHE_WORKLOAD: | + [[keyspace]] + commands = [ + { verb = "get", weight = 8 }, + { verb = "set", weight = 2 } + ] + length = 3 + values = [ + { length = 16 } + ] + PING_WORKLOAD: | + [[keyspace]] + commands = [ + { verb = "ping", weight = 1 }, + ] + shell: bash + - name: Configure TLS + run: if ${{ inputs.tls }}; then echo "$TLS_CONFIG" >> client.toml; fi + env: + TLS_CONFIG: | + [tls] + verify = false + certificate_chain = "root.crt" + certificate = "client.crt" + private_key = "client.key" + shell: bash + - name: Run rpc-perf + run: rpc-perf/target/release/rpc-perf client.toml + shell: bash diff --git a/.github/workflows/cargo.yml b/.github/workflows/cargo.yml index d64c3d823..75481de04 100644 --- a/.github/workflows/cargo.yml +++ b/.github/workflows/cargo.yml @@ -138,3 +138,109 @@ jobs: run: | cargo install cargo-audit cargo audit + + smoketest-pingserver: + name: smoketest-pingserver + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + path: pelikan + - name: Build Cache for Pelikan + uses: Swatinem/rust-cache@v1 + with: + key: pelikan + working-directory: pelikan + - uses: actions-rs/toolchain@v1 + with: + toolchain: stable + - uses: ./pelikan/.github/actions/ca + - uses: ./pelikan/.github/actions/pingserver + with: + tls: false + - uses: ./pelikan/.github/actions/rpc-perf + with: + protocol: ping + port: 12321 + tls: false + + smoketest-pingserver-tls: + name: smoketest-pingserver-tls + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + path: pelikan + - name: Build Cache for Pelikan + uses: Swatinem/rust-cache@v1 + with: + key: pelikan + working-directory: pelikan + - uses: actions-rs/toolchain@v1 + with: + toolchain: stable + - uses: ./pelikan/.github/actions/ca + - uses: ./pelikan/.github/actions/pingserver + with: + tls: true + - uses: ./pelikan/.github/actions/rpc-perf + with: + protocol: ping + port: 12321 + tls: true + + smoketest-pingproxy: + name: smoketest-pingproxy + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + path: pelikan + - name: Build Cache for Pelikan + uses: Swatinem/rust-cache@v1 + with: + key: pelikan + working-directory: pelikan + - uses: actions-rs/toolchain@v1 + with: + toolchain: stable + - uses: ./pelikan/.github/actions/ca + - uses: ./pelikan/.github/actions/pingserver + with: + tls: false + - uses: ./pelikan/.github/actions/pingproxy + with: + tls: false + - uses: ./pelikan/.github/actions/rpc-perf + with: + protocol: ping + port: 12322 + tls: false + + smoketest-pingproxy-tls-terminating: + name: smoketest-pingproxy-tls-terminating + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + path: pelikan + - name: Build Cache for Pelikan + uses: Swatinem/rust-cache@v1 + with: + key: pelikan + working-directory: pelikan + - uses: actions-rs/toolchain@v1 + with: + toolchain: stable + - uses: ./pelikan/.github/actions/ca + - uses: ./pelikan/.github/actions/pingserver + with: + tls: false + - uses: ./pelikan/.github/actions/pingproxy + with: + tls: true + - uses: ./pelikan/.github/actions/rpc-perf + with: + protocol: ping + port: 12322 + tls: true \ No newline at end of file diff --git a/src/rust/core/proxy/src/admin.rs b/src/rust/core/proxy/src/admin.rs index 461011f50..2f7f3fe6b 100644 --- a/src/rust/core/proxy/src/admin.rs +++ b/src/rust/core/proxy/src/admin.rs @@ -73,7 +73,6 @@ pub struct AdminBuilder { nevent: usize, poll: Poll, timeout: Duration, - ssl_context: Option, parser: AdminRequestParser, log_drain: Box, http_server: Option, @@ -82,9 +81,8 @@ pub struct AdminBuilder { impl AdminBuilder { /// Creates a new `Admin` event loop. - pub fn new( + pub fn new( config: &T, - // ssl_context: Option, mut log_drain: Box, ) -> Result { let config = config.admin(); @@ -101,7 +99,7 @@ impl AdminBuilder { let _ = log_drain.flush(); Error::new(ErrorKind::Other, "failed to create epoll instance") })?; - poll.bind(addr).map_err(|e| { + poll.bind(addr, &Tls::default()).map_err(|e| { error!("{}", e); error!("failed to bind admin tcp listener"); let _ = log_drain.flush(); @@ -115,9 +113,6 @@ impl AdminBuilder { ) })?; - let ssl_context = None; - // let ssl_context = if config.use_tls() { ssl_context } else { None }; - let timeout = std::time::Duration::from_millis(config.timeout() as u64); let nevent = config.nevent(); @@ -145,7 +140,6 @@ impl AdminBuilder { timeout, nevent, poll, - ssl_context, parser: AdminRequestParser::new(), log_drain, http_server, @@ -177,7 +171,6 @@ impl AdminBuilder { nevent: self.nevent, poll: self.poll, timeout: self.timeout, - ssl_context: self.ssl_context, parser: self.parser, log_drain: self.log_drain, http_server: self.http_server, @@ -193,7 +186,6 @@ pub struct Admin { nevent: usize, poll: Poll, timeout: Duration, - ssl_context: Option, parser: AdminRequestParser, log_drain: Box, /// optional http server @@ -253,39 +245,7 @@ impl Admin { /// Repeatedly call accept on the listener fn do_accept(&mut self) { - loop { - match self.poll.accept() { - Ok((stream, _)) => { - // handle TLS if it is configured - if let Some(ssl_context) = &self.ssl_context { - match Ssl::new(ssl_context).map(|v| v.accept(stream)) { - // handle case where we have a fully-negotiated - // TLS stream on accept() - Ok(Ok(tls_stream)) => { - self.add_established_tls_session(tls_stream); - } - // handle case where further negotiation is - // needed - Ok(Err(HandshakeError::WouldBlock(tls_stream))) => { - self.add_handshaking_tls_session(tls_stream); - } - // some other error has occurred and we drop the - // stream - Ok(Err(_)) | Err(_) => { - TCP_ACCEPT_EX.increment(); - } - } - } else { - self.add_plain_session(stream); - }; - } - Err(e) => { - if e.kind() == ErrorKind::WouldBlock { - break; - } - } - } - } + while self.poll.accept().is_ok() {} } /// This is a handler for the stats commands on the legacy admin port. It diff --git a/src/rust/core/proxy/src/listener.rs b/src/rust/core/proxy/src/listener.rs index d92b9c997..e2d05a675 100644 --- a/src/rust/core/proxy/src/listener.rs +++ b/src/rust/core/proxy/src/listener.rs @@ -4,6 +4,7 @@ use crate::*; use config::proxy::ListenerConfig; +use config::TlsConfig; use core::time::Duration; use mio::Waker; use poll::*; @@ -16,6 +17,12 @@ const KB: usize = 1024; const SESSION_BUFFER_MIN: usize = 16 * KB; const SESSION_BUFFER_MAX: usize = 1024 * KB; +static_metrics! { + static LISTENER_EVENT_ERROR: Counter; + static LISTENER_EVENT_READ: Counter; + static LISTENER_EVENT_WRITE: Counter; +} + pub struct ListenerBuilder { addr: SocketAddr, nevent: usize, @@ -24,7 +31,8 @@ pub struct ListenerBuilder { } impl ListenerBuilder { - pub fn new(config: &T) -> Result { + pub fn new(config: &T) -> Result { + let tls_config = config.tls(); let config = config.listener(); let addr = config @@ -34,7 +42,7 @@ impl ListenerBuilder { let timeout = Duration::from_millis(config.timeout() as u64); let mut poll = Poll::new()?; - poll.bind(addr)?; + poll.bind(addr, tls_config)?; Ok(Self { addr, @@ -68,6 +76,81 @@ pub struct Listener { } impl Listener { + /// Handle an event on an existing session + fn handle_session_event(&mut self, event: &Event) { + let token = event.token(); + + // handle error events first + if event.is_error() { + LISTENER_EVENT_ERROR.increment(); + self.handle_error(token); + } + + // handle write events before read events to reduce write + // buffer growth if there is also a readable event + if event.is_writable() { + LISTENER_EVENT_WRITE.increment(); + self.do_write(token); + } + + // read events are handled last + if event.is_readable() { + LISTENER_EVENT_READ.increment(); + let _ = self.do_read(token); + } + + if let Ok(session) = self.poll.get_mut_session(token) { + if session.session.do_handshake().is_ok() { + trace!("handshake complete for session: {:?}", session.session); + if let Ok(session) = self.poll.remove_session(token) { + if self + .connection_queues + .try_send_any(session.session) + .is_err() + { + error!("error sending session to worker"); + TCP_ACCEPT_EX.increment(); + } + } else { + error!("error removing session from poller"); + TCP_ACCEPT_EX.increment(); + } + } else { + trace!("handshake incomplete for session: {:?}", session.session); + } + } + } + + pub fn do_accept(&mut self) { + if let Ok(token) = self.poll.accept() { + match self + .poll + .get_mut_session(token) + .map(|v| v.session.is_handshaking()) + { + Ok(false) => { + if let Ok(session) = self.poll.remove_session(token) { + if self + .connection_queues + .try_send_any(session.session) + .is_err() + { + warn!("rejecting connection, client connection queue is too full"); + } else { + trace!("sending new connection to worker threads"); + } + } + } + Ok(true) => {} + Err(e) => { + warn!("error checking if new session is handshaking: {}", e); + } + } + } + self.poll.reregister(LISTENER_TOKEN); + let _ = self.connection_queues.wake(); + } + pub fn run(mut self) { info!("running listener on: {}", self.addr); @@ -77,27 +160,24 @@ impl Listener { for event in &events { match event.token() { LISTENER_TOKEN => { - // TODO(bmartin): this assumes plaintext connections - while let Ok((stream, _addr)) = self.poll.accept() { - let session = Session::plain_with_capacity( - stream, - SESSION_BUFFER_MIN, - SESSION_BUFFER_MAX, - ); - if self.connection_queues.try_send_any(session).is_err() { - warn!("rejecting connection, client connection queue is too full"); - } else { - trace!("sending new connection to worker threads"); - } - let _ = self.connection_queues.wake(); - } + self.do_accept(); } WAKER_TOKEN => {} - token => { - warn!("listener: unexpected event for token: {}", token.0); + _ => { + self.handle_session_event(event); } } } } } } + +impl EventLoop for Listener { + fn handle_data(&mut self, _token: Token) -> Result<()> { + Ok(()) + } + + fn poll(&mut self) -> &mut Poll { + &mut self.poll + } +} diff --git a/src/rust/core/proxy/src/poll.rs b/src/rust/core/proxy/src/poll.rs index bd5e1959f..26f3d8b13 100644 --- a/src/rust/core/proxy/src/poll.rs +++ b/src/rust/core/proxy/src/poll.rs @@ -5,14 +5,11 @@ //! This module provides common functionality for threads which are based on an //! event loop. +use crate::TCP_ACCEPT_EX; +use common::ssl::*; use mio::event::Source; -use mio::net::TcpListener; -use mio::Events; -use mio::Interest; -use mio::Token; -use mio::Waker; -use session::Session; -use session::TcpStream; +use mio::{Events, Interest, Token, Waker}; +use session::{Session, TcpStream}; use slab::Slab; use std::convert::TryFrom; use std::net::SocketAddr; @@ -22,6 +19,32 @@ use std::time::Duration; pub const LISTENER_TOKEN: Token = Token(usize::MAX - 1); pub const WAKER_TOKEN: Token = Token(usize::MAX); +const KB: usize = 1024; + +const SESSION_BUFFER_MIN: usize = 16 * KB; +const SESSION_BUFFER_MAX: usize = 1024 * KB; + +struct TcpListener { + inner: mio::net::TcpListener, + ssl_context: Option, +} + +impl TcpListener { + pub fn bind(addr: SocketAddr, tls_config: &dyn TlsConfig) -> Result { + let listener = mio::net::TcpListener::bind(addr).map_err(|e| { + error!("{}", e); + std::io::Error::new(std::io::ErrorKind::Other, "failed to start tcp listener") + })?; + + let ssl_context = common::ssl::ssl_context(tls_config)?; + + Ok(Self { + inner: listener, + ssl_context, + }) + } +} + pub struct Poll { listener: Option, poll: mio::Poll, @@ -56,8 +79,12 @@ impl Poll { } /// Bind and begin listening on the provided address. - pub fn bind(&mut self, addr: SocketAddr) -> Result<(), std::io::Error> { - let mut listener = TcpListener::bind(addr).map_err(|e| { + pub fn bind( + &mut self, + addr: SocketAddr, + tls_config: &dyn TlsConfig, + ) -> Result<(), std::io::Error> { + let mut listener = TcpListener::bind(addr, tls_config).map_err(|e| { error!("{}", e); std::io::Error::new(std::io::ErrorKind::Other, "failed to start tcp listener") })?; @@ -65,7 +92,7 @@ impl Poll { // register listener to event loop self.poll .registry() - .register(&mut listener, LISTENER_TOKEN, Interest::READABLE) + .register(&mut listener.inner, LISTENER_TOKEN, Interest::READABLE) .map_err(|e| { error!("{}", e); std::io::Error::new( @@ -88,15 +115,55 @@ impl Poll { self.poll.poll(events, Some(timeout)) } - pub fn accept(&mut self) -> Result<(TcpStream, SocketAddr), std::io::Error> { + pub fn accept(&mut self) -> Result { if let Some(ref mut listener) = self.listener { - let (stream, addr) = listener.accept()?; + let (stream, _addr) = listener.inner.accept()?; // disable Nagle's algorithm let _ = stream.set_nodelay(true); let stream = TcpStream::try_from(stream)?; - Ok((stream, addr)) + + let session = if let Some(ssl_context) = &listener.ssl_context { + match Ssl::new(ssl_context).map(|v| v.accept(stream)) { + // handle case where we have a fully-negotiated + // TLS stream on accept() + Ok(Ok(stream)) => { + Session::tls_with_capacity(stream, SESSION_BUFFER_MIN, SESSION_BUFFER_MAX) + } + // handle case where further negotiation is + // needed + Ok(Err(HandshakeError::WouldBlock(stream))) => { + Session::handshaking_with_capacity( + stream, + SESSION_BUFFER_MIN, + SESSION_BUFFER_MAX, + ) + } + // some other error has occurred and we drop the + // stream + Ok(Err(e)) => { + error!("accept failed: {}", e); + TCP_ACCEPT_EX.increment(); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "accept failed", + )); + } + Err(e) => { + error!("accept failed: {}", e); + TCP_ACCEPT_EX.increment(); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "accept failed", + )); + } + } + } else { + Session::plain_with_capacity(stream, SESSION_BUFFER_MIN, SESSION_BUFFER_MAX) + }; + + self.add_session(session) } else { Err(std::io::Error::new( std::io::ErrorKind::Other, @@ -161,12 +228,14 @@ impl Poll { LISTENER_TOKEN => { if let Some(ref mut listener) = self.listener { if listener + .inner .reregister(self.poll.registry(), LISTENER_TOKEN, Interest::READABLE) .is_err() { warn!("reregister of listener failed, attempting to recover"); - let _ = listener.deregister(self.poll.registry()); + let _ = listener.inner.deregister(self.poll.registry()); if listener + .inner .register(self.poll.registry(), LISTENER_TOKEN, Interest::READABLE) .is_err() { diff --git a/src/rust/core/proxy/src/process.rs b/src/rust/core/proxy/src/process.rs index ca886930d..b14e8b1a6 100644 --- a/src/rust/core/proxy/src/process.rs +++ b/src/rust/core/proxy/src/process.rs @@ -12,6 +12,7 @@ use common::signal::Signal; use config::proxy::{BackendConfig, FrontendConfig, ListenerConfig}; use config::AdminConfig; use config::ServerConfig; +use config::TlsConfig; use crossbeam_channel::bounded; use crossbeam_channel::Sender; use logger::Drain; @@ -41,12 +42,15 @@ where ResponseParser: 'static + Clone + Send + Parse, Response: 'static + Send + Compose, { - pub fn new( + pub fn new( config: T, request_parser: RequestParser, response_parser: ResponseParser, log_drain: Box, ) -> Result { + // initialize the clock + common::time::refresh_clock(); + let admin_builder = AdminBuilder::new(&config, log_drain).unwrap_or_else(|e| { error!("failed to initialize admin: {}", e); std::process::exit(1);