Skip to content

Commit

Permalink
Fix channel leaks (project-oak#1872)
Browse files Browse the repository at this point in the history
  • Loading branch information
rbehjati committed Feb 12, 2021
1 parent 65e49bb commit 0fbde16
Show file tree
Hide file tree
Showing 19 changed files with 118 additions and 48 deletions.
2 changes: 1 addition & 1 deletion docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ This command generates a signature file `signature.sign` containing a public
key, a signature of the Wasm module SHA-256 hash (also encoded with PEM) and a
Wasm module SHA-256 hash itself.

Wasm module signatures are specified as part of the Oak apllication manifest.
Wasm module signatures are specified as part of the Oak application manifest.
The application manifest can contain locations of signature manifest files. Each
signature manifest file contains the locations of signature files for signed Oak
Wasm modules. Also, since each module can be signed by multiple entities, each
Expand Down
2 changes: 1 addition & 1 deletion examples/aggregator/client/android/cpp/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ JNIEXPORT void JNICALL Java_com_google_oak_aggregator_MainActivity_createChannel
// The particular value corresponds to the hash on the `aggregator.wasm` line in
// https://github.com/project-oak/oak/blob/hashes/reproducibility_index.
oak::label::Label label = oak::WebAssemblyModuleHashLabel(
absl::HexStringToBytes("3fe9708f70d8b63c51ed9c179928f7a48e2d92fcd90e6a4bf87e1572838e2975"));
absl::HexStringToBytes("6ec12d5b2631efc2f30b6377a0ad0075e432165752dc360f5354bf9eebc535a7"));
kChannel = Aggregator::NewStub(oak::ApplicationClient::CreateChannel(
address, oak::ApplicationClient::GetTlsChannelCredentials(ca_cert), label));
JNI_LOG("gRPC channel has been created");
Expand Down
2 changes: 1 addition & 1 deletion examples/aggregator/client/cpp/aggregator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ int main(int argc, char** argv) {
// https://github.com/project-oak/oak/blob/hashes/reproducibility_index.
// TODO(#1674): Add appropriate TLS endpoint tag to the label as well.
oak::label::Label label = oak::WebAssemblyModuleHashLabel(
absl::HexStringToBytes("3fe9708f70d8b63c51ed9c179928f7a48e2d92fcd90e6a4bf87e1572838e2975"));
absl::HexStringToBytes("6ec12d5b2631efc2f30b6377a0ad0075e432165752dc360f5354bf9eebc535a7"));
// Connect to the Oak Application.
auto stub = Aggregator::NewStub(oak::ApplicationClient::CreateChannel(
address, oak::ApplicationClient::GetTlsChannelCredentials(ca_cert), label));
Expand Down
2 changes: 1 addition & 1 deletion examples/aggregator/config.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
grpc_server_listen_address = "[::]:8080"
backend_server_address = "https://localhost:8888"
aggregator_module_hash = "3fe9708f70d8b63c51ed9c179928f7a48e2d92fcd90e6a4bf87e1572838e2975"
aggregator_module_hash = "6ec12d5b2631efc2f30b6377a0ad0075e432165752dc360f5354bf9eebc535a7"
2 changes: 1 addition & 1 deletion examples/aggregator/oak_app_manifest.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name = "aggregator"

[modules]
app = { external = { url = "https://storage.googleapis.com/oak-modules/aggregator/3fe9708f70d8b63c51ed9c179928f7a48e2d92fcd90e6a4bf87e1572838e2975", sha256 = "3fe9708f70d8b63c51ed9c179928f7a48e2d92fcd90e6a4bf87e1572838e2975" } }
app = { external = { url = "https://storage.googleapis.com/oak-modules/aggregator/6ec12d5b2631efc2f30b6377a0ad0075e432165752dc360f5354bf9eebc535a7", sha256 = "6ec12d5b2631efc2f30b6377a0ad0075e432165752dc360f5354bf9eebc535a7" } }
6 changes: 4 additions & 2 deletions examples/chat/module/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl oak::CommandHandler for Router {
// The router node has a public confidentiality label, and therefore cannot read the
// contents of the request of the invocation (unless it happens to be public as well), but
// it can always inspect its label.
match (&command.receiver, &command.sender) {
let result = match (&command.receiver, &command.sender) {
(Some(receiver), Some(sender)) => {
let label = receiver.label()?;
let grpc_response_writer = oak::grpc::ChannelResponseWriter::new(sender.clone());
Expand Down Expand Up @@ -150,7 +150,9 @@ impl oak::CommandHandler for Router {
_ => {
anyhow::bail!("received malformed gRPC invocation");
}
}
};
command.close()?;
result
}
}

Expand Down
6 changes: 4 additions & 2 deletions examples/hello_world/module/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,10 @@ impl CommandHandler for Router {
},
)
.context("Couldn't create handler node")?;
handler_invocation_sender
let result = handler_invocation_sender
.send(&invocation)
.context("Couldn't send invocation to handler node")
.context("Couldn't send invocation to handler node");
invocation.close()?;
result
}
}
3 changes: 2 additions & 1 deletion examples/http_server/module/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ impl CommandHandler for RedirectHelper {
type Command = RedirectInvocation;

fn handle_command(&mut self, invocation: RedirectInvocation) -> anyhow::Result<()> {
let request = invocation.request_receiver.unwrap().receive()?;
let request = invocation.request_receiver.as_ref().unwrap().receive()?;
invocation.request_receiver.unwrap().close()?;
let client_invocation = invocation.http_invocation_source.unwrap();
let uri = request.uri.parse::<http::Uri>()?;

Expand Down
3 changes: 3 additions & 0 deletions examples/private_set_intersection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## To update this example

This example requires a valid signature of the `handler` module. So, whenever
the code is modified, the wasm module and the signature must be regenerated:

1. Build the example, including the Wasm module
1. Push the module to GS via
`./scripts/push_example -e private_set_intersection`
Expand Down
2 changes: 1 addition & 1 deletion examples/private_set_intersection/oak_app_manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ signature_manifests = [
[modules]
app = { path = "examples/private_set_intersection/bin/private_set_intersection.wasm" }
# TODO(865): Use locally built module once reproducibility is fixed.
handler = { external = { url = "https://storage.googleapis.com/oak-modules/private_set_intersection_handler/ab7e2c01769472c9283d014285343a5e357f0b313372bc4f24b41d0ca5013549", sha256 = "ab7e2c01769472c9283d014285343a5e357f0b313372bc4f24b41d0ca5013549" } }
handler = { external = { url = "https://storage.googleapis.com/oak-modules/private_set_intersection_handler/f04989bf7db987b00a8a2f4f1ea72b8d415392749b9a44660cb5e1c52135c94e", sha256 = "f04989bf7db987b00a8a2f4f1ea72b8d415392749b9a44660cb5e1c52135c94e" } }
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ f41SClNtR4i46v2Tuh1fQLbt/ZqRr1lENajCW92jyP4=
-----END PUBLIC KEY-----

-----BEGIN SIGNATURE-----
6tIedbmPjPq9Awwgdwktfs0UZA0YYex1luPGuoRxuv/UYVZATPuPlq11hqnRZ+JH
tMw6TtvrFALN/hKRXJcSBw==
FYdRH9iIqQSIpcZFFkenfFMtSQGteYGfB4fPlSpcMAugNa+hq7sdMgyzVYef8XJ+
xsZQiYgeHuqbA2CPYHDTBA==
-----END SIGNATURE-----

-----BEGIN HASH-----
q34sAXaUcskoPQFChTQ6XjV/CzEzcrxPJLQdDKUBNUk=
8EmJv325h7AKii9PHqcrjUFTknSbmkRmDLXhxSE1yU4=
-----END HASH-----
6 changes: 4 additions & 2 deletions examples/translator/module/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ impl CommandHandler for Router {
self.init.clone(),
)
.context("Couldn't create handler node")?;
handler_invocation_sender
let ret = handler_invocation_sender
.send(&invocation)
.context("Couldn't send invocation to handler node")
.context("Couldn't send invocation to handler node");
invocation.close()?;
ret
}
}

Expand Down
6 changes: 4 additions & 2 deletions examples/trusted_database/module/rust/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ impl CommandHandler for Router {
self.init.clone(),
)
.context("Couldn't create handler node")?;
handler_invocation_sender
let result = handler_invocation_sender
.send(&invocation)
.context("Couldn't send invocation to handler node")
.context("Couldn't send invocation to handler node");
invocation.close()?;
result
}
}

Expand Down
78 changes: 56 additions & 22 deletions oak_runtime/src/node/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ use oak_abi::{
proto::oak::application::HttpServerConfiguration,
OakStatus,
};
use oak_io::{handle::ReadHandle, OakError, Receiver};
use oak_io::{
handle::{ReadHandle, WriteHandle},
OakError, Receiver,
};
use oak_services::proto::oak::encap::{HeaderMap, HttpRequest, HttpResponse};
use std::{io, net::SocketAddr, pin::Pin};
use tokio::{
Expand Down Expand Up @@ -226,27 +229,20 @@ impl Node for HttpServerNode {
// At start-of-day we need/expect to receive a write handle for an invocation channel
// to use for all subsequent activity.
info!("{}: Waiting for invocation channel", self.node_name);
let startup_receiver = Receiver::<HttpInvocationSender>::new(ReadHandle {
let read_handle = ReadHandle {
handle: startup_handle,
});
let invocation_channel =
match startup_receiver
.receive(&runtime)
.and_then(|invocation_sender| {
invocation_sender
.sender
.ok_or(OakError::OakStatus(OakStatus::ErrBadHandle))
}) {
Ok(sender) => sender.handle.handle,
Err(status) => {
error!(
"Failed to retrieve invocation channel write handle: {:?}",
status
);
return;
}
};
if let Err(err) = startup_receiver.close(&runtime) {
};
let invocation_channel = match get_invocation_channel(&runtime, read_handle) {
Ok(writer) => writer,
Err(status) => {
error!(
"Failed to retrieve invocation channel write handle: {:?}",
status
);
return;
}
};
if let Err(err) = runtime.channel_close(startup_handle) {
error!(
"Failed to close initial inbound channel {}: {:?}",
startup_handle, err
Expand Down Expand Up @@ -274,6 +270,30 @@ impl Node for HttpServerNode {
}
}

/// Reads the [`WriteHandle`] (to be used for sending new invocations) from a startup channel.
/// Returns an error if the startup channel couldn't be read, or if the initial message is
/// invalid (it must be an encoded [`HttpInvocationSender`]).
fn get_invocation_channel(
runtime: &RuntimeProxy,
startup_handle: ReadHandle,
) -> Result<WriteHandle, OakError> {
let startup_receiver = Receiver::<HttpInvocationSender>::new(startup_handle);
let invocation_channel = startup_receiver.receive(&runtime)?;
match &invocation_channel.sender {
Some(invocation_sender) => {
info!(
"Invocation channel write handle received: {}",
invocation_sender.handle.handle
);
Ok(invocation_sender.handle)
}
None => {
error!("Couldn't receive the invocation sender.");
Err(OakError::OakStatus(OakStatus::ErrBadHandle))
}
}
}

fn create_async_runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new()
// Use simple scheduler that runs all tasks on the current-thread.
Expand All @@ -295,7 +315,7 @@ struct HttpRequestHandler {
/// Reference to the Runtime in the context of this HTTP server pseudo-Node.
runtime: RuntimeProxy,
/// Channel handle used for writing HTTP invocations.
invocation_channel: oak_abi::Handle,
invocation_channel: WriteHandle,
}

impl HttpRequestHandler {
Expand Down Expand Up @@ -529,6 +549,20 @@ impl HttpResponseReceiver {
}
}

impl Drop for HttpResponseReceiver {
fn drop(&mut self) {
if let Err(err) = self
.runtime
.channel_close(self.response_receiver.handle.handle)
{
error!(
"Failed to close response channel {}: {:?}",
self.response_receiver.handle.handle, err
);
}
}
}

/// Create an instance of Oak HttpRequest from the given hyper Request.
async fn to_oak_http_request(req: Request<Body>) -> anyhow::Result<HttpRequest> {
let uri = req.uri().to_string();
Expand Down
2 changes: 1 addition & 1 deletion oak_runtime/src/node/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ impl Node for ClientTesterNode {
.expect("Couldn't insert HTTP request in the pipe");

// send the invocation to the HTTP client pseudo-node
pipe.send_invocation(&runtime, invocation_sender.handle.handle)
pipe.send_invocation(&runtime, invocation_sender.handle)
.expect("Couldn't send the invocation");

// wait for the response to come
Expand Down
6 changes: 2 additions & 4 deletions oak_runtime/src/node/http/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,14 @@ impl Pipe {
pub fn send_invocation(
&self,
runtime: &RuntimeProxy,
invocation_channel: oak_abi::Handle,
invocation_channel: WriteHandle,
) -> anyhow::Result<()> {
// Create an invocation containing request-specific channels.
let invocation = HttpInvocation {
receiver: Some(self.request_receiver.clone()),
sender: Some(self.response_sender.clone()),
};
let invocation_sender = crate::io::Sender::new(WriteHandle {
handle: invocation_channel,
});
let invocation_sender = crate::io::Sender::new(invocation_channel);
invocation_sender
.send_with_downgrade(invocation, runtime)
.context("Couldn't write the invocation message")
Expand Down
14 changes: 14 additions & 0 deletions sdk/rust/oak/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{
io::{ReceiverExt, SenderExt},
OakError, OakStatus,
};
use anyhow::Context;
use log::{error, warn};
use oak_abi::label::Label;
use oak_services::proto::google::rpc;
Expand All @@ -37,6 +38,19 @@ pub mod server;
pub type Result<T> = std::result::Result<T, rpc::Status>;
pub type Invocation = crate::proto::oak::invocation::GrpcInvocation;

impl Invocation {
pub fn close(self) -> anyhow::Result<()> {
self.receiver
.expect("Couldn't get receiver")
.close()
.context("Couldn't close the receiver")?;
self.sender
.expect("Couldn't get sender")
.close()
.context("Couldn't close the sender")
}
}

/// Helper to create a gRPC status object.
pub fn build_status(code: rpc::Code, msg: &str) -> rpc::Status {
rpc::Status {
Expand Down
17 changes: 14 additions & 3 deletions sdk/rust/oak/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ pub fn entrypoint_node_create<
) -> Result<Sender<Command>, oak_io::OakError> {
let node_config = &crate::node_config::wasm(wasm_module_name, T::ENTRYPOINT_NAME);
let init_sender = node_create(name, label, node_config)?;
send_init(init_sender, init, label)
let result = send_init(&init_sender, init, label);
init_sender.close()?;
result
}

/// Sends an init message over the provided [`Sender`], which is consumed by this method, and
Expand Down Expand Up @@ -154,10 +156,19 @@ pub fn forward_invocation(
// Check if request label is valid in the context of invocation sender.
if request_label.flows_to(&sender_label) {
// Forward invocation through invocation sender.
invocation_sender
let result = invocation_sender
.send(&invocation)
.context("Couldn't forward invocation")
.context("Couldn't forward invocation");
// Close the channels.
invocation.close()?;
result
} else {
// Close the receiver channel separately.
invocation
.receiver
.expect("Couldn't get receiver")
.close()
.expect("Couldn't close the receiver");
// Return an error through `response_sender`.
let grpc_response_writer =
crate::grpc::ChannelResponseWriter::new(response_sender.clone());
Expand Down
1 change: 1 addition & 0 deletions sdk/rust/oak/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,7 @@ macro_rules! entrypoint_command_handler_init {
use ::oak::io::ReceiverExt;
use ::oak::WithInit;
let init_wrapper = receiver.receive().expect("could not receive init");
receiver.close().expect("could not close the receiver channel");
let instance = <$handler>::create(init_wrapper.init);
::oak::run_command_loop(instance, init_wrapper.command_receiver.iter());
});
Expand Down

0 comments on commit 0fbde16

Please sign in to comment.