diff --git a/tests/tcp.rs b/tests/tcp.rs deleted file mode 100644 index 13acec75b91f4..0000000000000 --- a/tests/tcp.rs +++ /dev/null @@ -1,276 +0,0 @@ -#![cfg(all( - feature = "sinks-socket", - feature = "transforms-sample", - feature = "sources-socket", -))] - -use approx::assert_relative_eq; -use vector::{ - config, sinks, sources, - test_util::{ - next_addr, random_lines, send_lines, start_topology, trace_init, wait_for_tcp, - CountReceiver, - }, - transforms, -}; - -#[tokio::test] -async fn pipe() { - let num_lines: usize = 10000; - - let in_addr = next_addr(); - let out_addr = next_addr(); - - let mut config = config::Config::builder(); - config.add_source( - "in", - sources::socket::SocketConfig::make_basic_tcp_config(in_addr), - ); - config.add_sink( - "out", - &["in"], - sinks::socket::SocketSinkConfig::make_basic_tcp_config(out_addr.to_string()), - ); - - let mut output_lines = CountReceiver::receive_lines(out_addr); - - let (topology, _crash) = start_topology(config.build().unwrap(), false).await; - // Wait for server to accept traffic - wait_for_tcp(in_addr).await; - - // Wait for output to connect - output_lines.connected().await; - - let input_lines = random_lines(100).take(num_lines).collect::>(); - send_lines(in_addr, input_lines.clone()).await.unwrap(); - - // Shut down server - topology.stop().await; - - let output_lines = output_lines.await; - assert_eq!(num_lines, output_lines.len()); - assert_eq!(input_lines, output_lines); -} - -#[tokio::test] -async fn sample() { - let num_lines: usize = 10000; - - let in_addr = next_addr(); - let out_addr = next_addr(); - - let mut config = config::Config::builder(); - config.add_source( - "in", - sources::socket::SocketConfig::make_basic_tcp_config(in_addr), - ); - config.add_transform( - "sample", - &["in"], - transforms::sample::SampleConfig { - rate: 10, - key_field: Some(config::log_schema().message_key().into()), - exclude: None, - }, - ); - config.add_sink( - "out", - &["sample"], - sinks::socket::SocketSinkConfig::make_basic_tcp_config(out_addr.to_string()), - ); - - let mut output_lines = CountReceiver::receive_lines(out_addr); - - let (topology, _crash) = start_topology(config.build().unwrap(), false).await; - // Wait for server to accept traffic - wait_for_tcp(in_addr).await; - - // Wait for output to connect - output_lines.connected().await; - - let input_lines = random_lines(100).take(num_lines).collect::>(); - send_lines(in_addr, input_lines.clone()).await.unwrap(); - - // Shut down server - topology.stop().await; - - let output_lines = output_lines.await; - let num_output_lines = output_lines.len(); - - let output_lines_ratio = num_output_lines as f32 / num_lines as f32; - assert_relative_eq!(output_lines_ratio, 0.1, epsilon = 0.01); - - let mut input_lines = input_lines.into_iter(); - // Assert that all of the output lines were present in the input and in the same order - for output_line in output_lines { - let next_line = input_lines.by_ref().find(|l| l == &output_line); - assert_eq!(Some(output_line), next_line); - } -} - -#[tokio::test] -async fn fork() { - let num_lines: usize = 10000; - - let in_addr = next_addr(); - let out_addr1 = next_addr(); - let out_addr2 = next_addr(); - - let mut config = config::Config::builder(); - config.add_source( - "in", - sources::socket::SocketConfig::make_basic_tcp_config(in_addr), - ); - config.add_sink( - "out1", - &["in"], - sinks::socket::SocketSinkConfig::make_basic_tcp_config(out_addr1.to_string()), - ); - config.add_sink( - "out2", - &["in"], - sinks::socket::SocketSinkConfig::make_basic_tcp_config(out_addr2.to_string()), - ); - - let mut output_lines1 = CountReceiver::receive_lines(out_addr1); - let mut output_lines2 = CountReceiver::receive_lines(out_addr2); - - let (topology, _crash) = start_topology(config.build().unwrap(), false).await; - // Wait for server to accept traffic - wait_for_tcp(in_addr).await; - - // Wait for output to connect - output_lines1.connected().await; - output_lines2.connected().await; - - let input_lines = random_lines(100).take(num_lines).collect::>(); - send_lines(in_addr, input_lines.clone()).await.unwrap(); - - // Shut down server - topology.stop().await; - - let output_lines1 = output_lines1.await; - let output_lines2 = output_lines2.await; - assert_eq!(num_lines, output_lines1.len()); - assert_eq!(num_lines, output_lines2.len()); - assert_eq!(input_lines, output_lines1); - assert_eq!(input_lines, output_lines2); -} - -// In cpu constrained environments at least three threads -// are needed to finish processing all the events before -// sources are forcefully shutted down. -// Although that's still not a guarantee. -#[tokio::test(flavor = "multi_thread", worker_threads = 3)] -async fn merge_and_fork() { - trace_init(); - - let num_lines: usize = 10000; - - let in_addr1 = next_addr(); - let in_addr2 = next_addr(); - let out_addr1 = next_addr(); - let out_addr2 = next_addr(); - - // out1 receives both in1 and in2 - // out2 receives in2 only - let mut config = config::Config::builder(); - config.add_source( - "in1", - sources::socket::SocketConfig::make_basic_tcp_config(in_addr1), - ); - config.add_source( - "in2", - sources::socket::SocketConfig::make_basic_tcp_config(in_addr2), - ); - config.add_sink( - "out1", - &["in1", "in2"], - sinks::socket::SocketSinkConfig::make_basic_tcp_config(out_addr1.to_string()), - ); - config.add_sink( - "out2", - &["in2"], - sinks::socket::SocketSinkConfig::make_basic_tcp_config(out_addr2.to_string()), - ); - - let mut output_lines1 = CountReceiver::receive_lines(out_addr1); - let mut output_lines2 = CountReceiver::receive_lines(out_addr2); - - let (topology, _crash) = start_topology(config.build().unwrap(), false).await; - // Wait for server to accept traffic - wait_for_tcp(in_addr1).await; - wait_for_tcp(in_addr2).await; - - // Wait for output to connect - output_lines1.connected().await; - output_lines2.connected().await; - - let input_lines1 = random_lines(100).take(num_lines).collect::>(); - let input_lines2 = random_lines(100).take(num_lines).collect::>(); - send_lines(in_addr1, input_lines1.clone()).await.unwrap(); - send_lines(in_addr2, input_lines2.clone()).await.unwrap(); - - // Accept connection in Vector, before shutdown - tokio::task::yield_now().await; - - // Shut down server - topology.stop().await; - - let output_lines1 = output_lines1.await; - let output_lines2 = output_lines2.await; - - assert_eq!(input_lines1.len() + input_lines2.len(), output_lines1.len()); - assert_eq!(input_lines2.len(), output_lines2.len()); - assert_eq!(input_lines2, output_lines2); - - // Assert that all of the output lines were present in the input and in the same order - let mut input_lines1 = input_lines1.into_iter().peekable(); - let mut input_lines2 = input_lines2.into_iter().peekable(); - for output_line in &output_lines1 { - if Some(output_line) == input_lines1.peek() { - input_lines1.next(); - } else if Some(output_line) == input_lines2.peek() { - input_lines2.next(); - } else { - panic!("Got line in output that wasn't in input"); - } - } - assert_eq!(input_lines1.next(), None); - assert_eq!(input_lines2.next(), None); -} - -#[tokio::test] -async fn reconnect() { - let num_lines: usize = 1000; - - let in_addr = next_addr(); - let out_addr = next_addr(); - - let mut config = config::Config::builder(); - config.add_source( - "in", - sources::socket::SocketConfig::make_basic_tcp_config(in_addr), - ); - config.add_sink( - "out", - &["in"], - sinks::socket::SocketSinkConfig::make_basic_tcp_config(out_addr.to_string()), - ); - - let output_lines = CountReceiver::receive_lines(out_addr); - - let (topology, _crash) = start_topology(config.build().unwrap(), false).await; - // Wait for server to accept traffic - wait_for_tcp(in_addr).await; - - let input_lines = random_lines(100).take(num_lines).collect::>(); - send_lines(in_addr, input_lines.clone()).await.unwrap(); - - // Shut down server and wait for it to fully flush - topology.stop().await; - - let output_lines = output_lines.await; - assert!(num_lines >= 2); - assert!(output_lines.iter().all(|line| input_lines.contains(line))) -}