Skip to content

Commit

Permalink
Workaround for actix-web bug.
Browse files Browse the repository at this point in the history
There is a bug in actix (actix/actix-web#1313)
that prevents it from dropping HTTP connections on client disconnect
unless the endpoint periodically sends some data.  As a result, all
API-based data transfers eventually start to fail in the UI.  As a
workaround, we generate an empty output chunk if there is no real
payload to send for more than 3 seconds.

Signed-off-by: Leonid Ryzhyk <leonid@feldera.com>
  • Loading branch information
Leonid Ryzhyk authored and ryzhyk committed Jul 19, 2023
1 parent ee088af commit 8dcf1a6
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 50 deletions.
2 changes: 1 addition & 1 deletion crates/adapters/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub use stats::{ControllerStatus, InputEndpointStatus, OutputEndpointStatus};
/// Maximal number of concurrent API connections per circuit
/// (including both input and output connecions).
// TODO: make this configurable.
const MAX_API_CONNECTIONS: u64 = 100;
pub(crate) const MAX_API_CONNECTIONS: u64 = 100;

pub(crate) type EndpointId = u64;

Expand Down
32 changes: 27 additions & 5 deletions crates/adapters/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,11 +773,14 @@ async fn output_endpoint(
#[cfg(feature = "server")]
mod test_with_kafka {
use super::{bootstrap, build_app, Args, ServerState};
use crate::test::{
generate_test_batches,
http::{TestHttpReceiver, TestHttpSender},
kafka::{BufferConsumer, KafkaResources, TestProducer},
test_circuit,
use crate::{
controller::MAX_API_CONNECTIONS,
test::{
generate_test_batches,
http::{TestHttpReceiver, TestHttpSender},
kafka::{BufferConsumer, KafkaResources, TestProducer},
test_circuit,
},
};
use actix_web::{http::StatusCode, middleware::Logger, web::Data as WebData, App};
use futures_util::StreamExt;
Expand Down Expand Up @@ -940,6 +943,23 @@ outputs:
}
}

// Make sure that HTTP connections get dropped on client disconnect
// (see comment in `HttpOutputEndpoint::request`). We create 2x the
// number of supported simultaneous API connections and drop the client
// side instantly, which should cause the server side to close within
// 6 seconds. If everything works as intended, this should _not_
// trigger the API connection limit error.
for _ in 0..2 * MAX_API_CONNECTIONS {
assert!(server
.get("/egress/test_output1")
.send()
.await
.unwrap()
.status()
.is_success());
sleep(Duration::from_millis(75));
}

println!("Connecting to HTTP output endpoint");
let mut resp1 = server.get("/egress/test_output1").send().await.unwrap();

Expand Down Expand Up @@ -969,6 +989,8 @@ outputs:
drop(resp1);
drop(resp2);

sleep(Duration::from_millis(5000));

// Request quantiles.
let mut quantiles_resp1 = server
.get("/egress/test_output1?mode=snapshot&query=quantiles")
Expand Down
108 changes: 64 additions & 44 deletions crates/adapters/src/transport/http/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ use std::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use tokio::{
sync::broadcast::{self, error::RecvError},
time::timeout,
};
use tokio::sync::broadcast::{self, error::RecvError};

// TODO: make this configurable via endpoint config.
const MAX_BUFFERS: usize = 100;
Expand Down Expand Up @@ -85,6 +89,47 @@ impl HttpOutputEndpointInner {
// async_error_callback: RwLock::new(None),
}
}

fn push_buffer(&self, buffer: &[u8]) -> AnyResult<()> {
let seq_number = self.total_buffers.fetch_add(1, Ordering::AcqRel);

let json_buf = Vec::with_capacity(buffer.len() + 1024);
let mut serializer = serde_json::Serializer::new(json_buf);
let mut struct_serializer = serializer
.serialize_struct("Chunk", 2)
.map_err(|e| anyhow!("error serializing 'Chunk' struct: '{e}'"))?;
struct_serializer
.serialize_field("sequence_number", &seq_number)
.map_err(|e| anyhow!("error serializing 'sequence_number' field: '{e}'"))?;

match self.format {
Format::Binary => unimplemented!(),
Format::Text => {
let data_str = std::str::from_utf8(buffer)
.map_err(|e| anyhow!("received an invalid UTF8 string from encoder: '{e}'"))?;
struct_serializer
.serialize_field("text_data", data_str)
.map_err(|e| anyhow!("error serializing 'text_data' field: '{e}'"))?;
}
Format::Json => unimplemented!(),
}
struct_serializer
.end()
.map_err(|e| anyhow!("error serializing 'text_data' field: '{e}'"))?;

let mut json_buf = serializer.into_inner();
json_buf.push(b'\r');
json_buf.push(b'\n');

// A failure simply means that there are no receivers.
let _ = self
.sender
.read()
.unwrap()
.as_ref()
.map(|sender| sender.send(Buffer::new(seq_number, Bytes::from(json_buf))));
Ok(())
}
}

struct RequestGuard {
Expand Down Expand Up @@ -150,15 +195,28 @@ impl HttpOutputEndpoint {
let name = self.name().to_string();
let guard = RequestGuard::new(finalizer);

let inner = self.inner.clone();

HttpResponse::Ok()
.insert_header(ContentType::json())
.streaming(stream! {
let _guard = guard;
loop {
match receiver.recv().await {
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(_)) => (),
Ok(buffer) => {
// There is a bug in actix (https://github.com/actix/actix-web/issues/1313)
// that prevents it from dropping HTTP connections on client disconnect
// unless the endpoint periodically sends some data. As a workaround,
// if there is not real payload to send for more than 3 seconds, we will
// generate an empty chunk. Note that it takes 6s, i.e., 2x the timeout
// period for actix to actually drop the connection.
match timeout(Duration::from_millis(3_000), receiver.recv()).await {
Err(_) => {
// Send the empty chunk via the `push_buffer` method to
// make sure it gets assigned correct sequence number.
let _ = inner.push_buffer(&[]);
}
Ok(Err(RecvError::Closed)) => break,
Ok(Err(RecvError::Lagged(_))) => (),
Ok(Ok(buffer)) => {
debug!(
"HTTP output endpoint '{}': sending chunk #{} ({} bytes)",
name,
Expand All @@ -181,45 +239,7 @@ impl OutputEndpoint for HttpOutputEndpoint {
}

fn push_buffer(&mut self, buffer: &[u8]) -> AnyResult<()> {
let seq_number = self.inner.total_buffers.fetch_add(1, Ordering::AcqRel);

let json_buf = Vec::with_capacity(buffer.len() + 1024);
let mut serializer = serde_json::Serializer::new(json_buf);
let mut struct_serializer = serializer
.serialize_struct("Chunk", 2)
.map_err(|e| anyhow!("error serializing 'Chunk' struct: '{e}'"))?;
struct_serializer
.serialize_field("sequence_number", &seq_number)
.map_err(|e| anyhow!("error serializing 'sequence_number' field: '{e}'"))?;

match self.inner.format {
Format::Binary => unimplemented!(),
Format::Text => {
let data_str = std::str::from_utf8(buffer)
.map_err(|e| anyhow!("received an invalid UTF8 string from encoder: '{e}'"))?;
struct_serializer
.serialize_field("text_data", data_str)
.map_err(|e| anyhow!("error serializing 'text_data' field: '{e}'"))?;
}
Format::Json => unimplemented!(),
}
struct_serializer
.end()
.map_err(|e| anyhow!("error serializing 'text_data' field: '{e}'"))?;

let mut json_buf = serializer.into_inner();
json_buf.push(b'\r');
json_buf.push(b'\n');

// A failure simply means that there are no receivers.
let _ = self
.inner
.sender
.read()
.unwrap()
.as_ref()
.map(|sender| sender.send(Buffer::new(seq_number, Bytes::from(json_buf))));
Ok(())
self.inner.push_buffer(buffer)
}

fn batch_end(&mut self) -> AnyResult<()> {
Expand Down

0 comments on commit 8dcf1a6

Please sign in to comment.