diff --git a/CMakeLists.txt b/CMakeLists.txt index 55226f9dccfb2..f41beeb3c3556 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14095,6 +14095,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) test/core/event_engine/test_suite/event_engine_test_utils.cc test/core/event_engine/test_suite/oracle_event_engine_posix.cc test/core/event_engine/test_suite/oracle_event_engine_posix_test.cc + test/core/event_engine/test_suite/server_test.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc ) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index cc8ef3c222f54..3a86049834b22 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -8236,6 +8236,7 @@ targets: - test/core/event_engine/test_suite/event_engine_test_utils.cc - test/core/event_engine/test_suite/oracle_event_engine_posix.cc - test/core/event_engine/test_suite/oracle_event_engine_posix_test.cc + - test/core/event_engine/test_suite/server_test.cc deps: - grpc_test_util platforms: diff --git a/test/core/event_engine/test_suite/BUILD b/test/core/event_engine/test_suite/BUILD index 51b1ccaadbcb1..8c33a9018a877 100644 --- a/test/core/event_engine/test_suite/BUILD +++ b/test/core/event_engine/test_suite/BUILD @@ -153,6 +153,7 @@ grpc_cc_test( deps = [ ":client", ":oracle_event_engine_posix", + ":server", "//:grpc", "//test/core/util:grpc_test_util", ], diff --git a/test/core/event_engine/test_suite/client_test.cc b/test/core/event_engine/test_suite/client_test.cc index 34ac4f05570ae..c894ff2c6deca 100644 --- a/test/core/event_engine/test_suite/client_test.cc +++ b/test/core/event_engine/test_suite/client_test.cc @@ -76,7 +76,7 @@ TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest) { test_ee->Connect( [&signal](absl::StatusOr> status) { // Connect should fail. - EXPECT_FALSE(status.ok()); + ASSERT_FALSE(status.ok()); signal.Notify(); }, URIToResolvedAddress(target_addr), config, @@ -114,26 +114,19 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) { auto quota = grpc_core::ResourceQuota::Default(); args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); ChannelArgsEndpointConfig config(args); - auto status = oracle_ee->CreateListener( + auto listener = *oracle_ee->CreateListener( std::move(accept_cb), [](absl::Status status) { GPR_ASSERT(status.ok()); }, config, std::make_unique("foo")); - EXPECT_TRUE(status.ok()); - std::unique_ptr listener = std::move(*status); - EXPECT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); - EXPECT_TRUE(listener->Start().ok()); + ASSERT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); + ASSERT_TRUE(listener->Start().ok()); test_ee->Connect( [&client_endpoint, - &client_signal](absl::StatusOr> status) { - if (!status.ok()) { - gpr_log(GPR_ERROR, "Connect failed: %s", - status.status().ToString().c_str()); - client_endpoint = nullptr; - } else { - client_endpoint = std::move(*status); - } + &client_signal](absl::StatusOr> endpoint) { + ASSERT_TRUE(endpoint.ok()); + client_endpoint = std::move(*endpoint); client_signal.Notify(); }, URIToResolvedAddress(target_addr), config, @@ -141,19 +134,19 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) { client_signal.WaitForNotification(); server_signal.WaitForNotification(); - EXPECT_NE(client_endpoint.get(), nullptr); - EXPECT_NE(server_endpoint.get(), nullptr); + ASSERT_NE(client_endpoint.get(), nullptr); + ASSERT_NE(server_endpoint.get(), nullptr); // Alternate message exchanges between client -- server and server -- // client. for (int i = 0; i < kNumExchangedMessages; i++) { // Send from client to server and verify data read at the server. - EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint.get(), + ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint.get(), server_endpoint.get()) .ok()); // Send from server to client and verify data read at the client. - EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), server_endpoint.get(), + ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), server_endpoint.get(), client_endpoint.get()) .ok()); } @@ -189,21 +182,19 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { auto quota = grpc_core::ResourceQuota::Default(); args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); ChannelArgsEndpointConfig config(args); - auto status = oracle_ee->CreateListener( + auto listener = *oracle_ee->CreateListener( std::move(accept_cb), [](absl::Status status) { GPR_ASSERT(status.ok()); }, config, std::make_unique("foo")); - EXPECT_TRUE(status.ok()); - std::unique_ptr listener = std::move(*status); target_addrs.reserve(kNumListenerAddresses); for (int i = 0; i < kNumListenerAddresses; i++) { std::string target_addr = absl::StrCat( "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); - EXPECT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); + ASSERT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); target_addrs.push_back(target_addr); } - EXPECT_TRUE(listener->Start().ok()); + ASSERT_TRUE(listener->Start().ok()); absl::SleepFor(absl::Milliseconds(500)); for (int i = 0; i < kNumConnections; i++) { std::unique_ptr client_endpoint; @@ -217,14 +208,9 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { ChannelArgsEndpointConfig client_config(client_args); test_ee->Connect( [&client_endpoint, - &client_signal](absl::StatusOr> status) { - if (!status.ok()) { - gpr_log(GPR_ERROR, "Connect failed: %s", - status.status().ToString().c_str()); - client_endpoint = nullptr; - } else { - client_endpoint = std::move(*status); - } + &client_signal](absl::StatusOr> endpoint) { + ASSERT_TRUE(endpoint.ok()); + client_endpoint = std::move(*endpoint); client_signal.Notify(); }, URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]), @@ -235,8 +221,8 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { client_signal.WaitForNotification(); server_signal->WaitForNotification(); - EXPECT_NE(client_endpoint.get(), nullptr); - EXPECT_NE(server_endpoint.get(), nullptr); + ASSERT_NE(client_endpoint.get(), nullptr); + ASSERT_NE(server_endpoint.get(), nullptr); connections.push_back(std::make_tuple(std::move(client_endpoint), std::move(server_endpoint))); delete server_signal; @@ -269,11 +255,11 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { // verify data read at the server. Otherwise send data from server // to client and verify data read at client. if (client_to_server) { - EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), + ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint, server_endpoint) .ok()); } else { - EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), + ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), server_endpoint, client_endpoint) .ok()); } diff --git a/test/core/event_engine/test_suite/server_test.cc b/test/core/event_engine/test_suite/server_test.cc index 7f58ffbdc3cb1..55627593fdbb6 100644 --- a/test/core/event_engine/test_suite/server_test.cc +++ b/test/core/event_engine/test_suite/server_test.cc @@ -12,12 +12,249 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/strings/str_cat.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" +#include "gtest/gtest.h" + +#include +#include +#include +#include + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" +#include "src/core/lib/gprpp/notification.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/resource_quota/memory_quota.h" +#include "src/core/lib/resource_quota/resource_quota.h" #include "test/core/event_engine/test_suite/event_engine_test.h" +#include "test/core/event_engine/test_suite/event_engine_test_utils.h" +#include "test/core/util/port.h" class EventEngineServerTest : public EventEngineTest {}; -// TODO(hork): establish meaningful tests -TEST_F(EventEngineServerTest, TODO) { grpc_core::ExecCtx exec_ctx; } +using namespace std::chrono_literals; + +namespace { + +using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig; +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::URIToResolvedAddress; +using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint; +using Listener = ::grpc_event_engine::experimental::EventEngine::Listener; +using ::grpc_event_engine::experimental::GetNextSendMessage; +using ::grpc_event_engine::experimental::WaitForSingleOwner; + +constexpr int kNumExchangedMessages = 100; + +} // namespace + +// Create a connection using the oracle EventEngine to a listener created +// by the Test EventEngine and exchange bi-di data over the connection. +// For each data transfer, verify that data written at one end of the stream +// equals data read at the other end of the stream. +TEST_F(EventEngineServerTest, ServerConnectExchangeBidiDataTransferTest) { + grpc_core::ExecCtx ctx; + auto oracle_ee = this->NewOracleEventEngine(); + std::shared_ptr test_ee(this->NewEventEngine()); + auto memory_quota = std::make_unique("bar"); + std::string target_addr = absl::StrCat( + "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); + std::unique_ptr client_endpoint; + std::unique_ptr server_endpoint; + grpc_core::Notification client_signal; + grpc_core::Notification server_signal; + + Listener::AcceptCallback accept_cb = + [&server_endpoint, &server_signal]( + std::unique_ptr ep, + grpc_core::MemoryAllocator /*memory_allocator*/) { + server_endpoint = std::move(ep); + server_signal.Notify(); + }; + + grpc_core::ChannelArgs args; + auto quota = grpc_core::ResourceQuota::Default(); + args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); + ChannelArgsEndpointConfig config(args); + auto listener = *test_ee->CreateListener( + std::move(accept_cb), [](absl::Status /*status*/) {}, config, + std::make_unique("foo")); + + ASSERT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); + ASSERT_TRUE(listener->Start().ok()); + + oracle_ee->Connect( + [&client_endpoint, + &client_signal](absl::StatusOr> endpoint) { + ASSERT_TRUE(endpoint.ok()); + client_endpoint = std::move(*endpoint); + client_signal.Notify(); + }, + URIToResolvedAddress(target_addr), config, + memory_quota->CreateMemoryAllocator("conn-1"), 24h); + + client_signal.WaitForNotification(); + server_signal.WaitForNotification(); + ASSERT_NE(client_endpoint.get(), nullptr); + ASSERT_NE(server_endpoint.get(), nullptr); + + // Alternate message exchanges between client -- server and server -- + // client. + for (int i = 0; i < kNumExchangedMessages; i++) { + // Send from client to server and verify data read at the server. + ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint.get(), + server_endpoint.get()) + .ok()); + + // Send from server to client and verify data read at the client. + ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), server_endpoint.get(), + client_endpoint.get()) + .ok()); + } + client_endpoint.reset(); + server_endpoint.reset(); + listener.reset(); + WaitForSingleOwner(std::move(test_ee)); +} + +// Create 1 listener bound to N IPv6 addresses and M connections where M > N and +// exchange and verify random number of messages over each connection. +TEST_F(EventEngineServerTest, + ServerMultipleIPv6ConnectionsToOneOracleListenerTest) { + grpc_core::ExecCtx ctx; + static constexpr int kNumListenerAddresses = 10; // N + static constexpr int kNumConnections = 10; // M + auto oracle_ee = this->NewOracleEventEngine(); + std::shared_ptr test_ee(this->NewEventEngine()); + auto memory_quota = std::make_unique("bar"); + std::unique_ptr server_endpoint; + // Notifications can only be fired once, so they are newed every loop + grpc_core::Notification* server_signal = new grpc_core::Notification(); + std::vector target_addrs; + std::vector, std::unique_ptr>> + connections; + + Listener::AcceptCallback accept_cb = + [&server_endpoint, &server_signal]( + std::unique_ptr ep, + grpc_core::MemoryAllocator /*memory_allocator*/) { + server_endpoint = std::move(ep); + server_signal->Notify(); + }; + grpc_core::ChannelArgs args; + auto quota = grpc_core::ResourceQuota::Default(); + args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); + ChannelArgsEndpointConfig config(args); + auto listener = *test_ee->CreateListener( + std::move(accept_cb), [](absl::Status /*status*/) {}, config, + std::make_unique("foo")); + + target_addrs.reserve(kNumListenerAddresses); + for (int i = 0; i < kNumListenerAddresses; i++) { + std::string target_addr = absl::StrCat( + "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); + ASSERT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); + target_addrs.push_back(target_addr); + } + ASSERT_TRUE(listener->Start().ok()); + absl::SleepFor(absl::Milliseconds(500)); + for (int i = 0; i < kNumConnections; i++) { + std::unique_ptr client_endpoint; + grpc_core::Notification client_signal; + // Create an oracle EventEngine client and connect to a one of the + // addresses bound to the test EventEngine listener. Verify that the + // connection succeeds. + grpc_core::ChannelArgs client_args; + auto client_quota = grpc_core::ResourceQuota::Default(); + client_args = client_args.Set(GRPC_ARG_RESOURCE_QUOTA, client_quota); + ChannelArgsEndpointConfig client_config(client_args); + oracle_ee->Connect( + [&client_endpoint, + &client_signal](absl::StatusOr> endpoint) { + ASSERT_TRUE(endpoint.ok()); + client_endpoint = std::move(*endpoint); + client_signal.Notify(); + }, + URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]), + client_config, + memory_quota->CreateMemoryAllocator( + absl::StrCat("conn-", std::to_string(i))), + 24h); + + client_signal.WaitForNotification(); + server_signal->WaitForNotification(); + ASSERT_NE(client_endpoint.get(), nullptr); + ASSERT_NE(server_endpoint.get(), nullptr); + connections.push_back(std::make_tuple(std::move(client_endpoint), + std::move(server_endpoint))); + delete server_signal; + server_signal = new grpc_core::Notification(); + } + delete server_signal; + + std::vector threads; + // Create one thread for each connection. For each connection, create + // 2 more worker threads: to exchange and verify bi-directional data + // transfer. + threads.reserve(kNumConnections); + for (int i = 0; i < kNumConnections; i++) { + // For each connection, simulate a parallel bi-directional data transfer. + // All bi-directional transfers are run in parallel across all + // connections. Each bi-directional data transfer uses a random number of + // messages. + threads.emplace_back([client_endpoint = + std::move(std::get<0>(connections[i])), + server_endpoint = + std::move(std::get<1>(connections[i]))]() { + std::vector workers; + workers.reserve(2); + auto worker = [client_endpoint = client_endpoint.get(), + server_endpoint = + server_endpoint.get()](bool client_to_server) { + grpc_core::ExecCtx ctx; + for (int i = 0; i < kNumExchangedMessages; i++) { + // If client_to_server is true, send from client to server and + // verify data read at the server. Otherwise send data from server + // to client and verify data read at client. + if (client_to_server) { + ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), + client_endpoint, server_endpoint) + .ok()); + } else { + ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), + server_endpoint, client_endpoint) + .ok()); + } + } + }; + // worker[0] simulates a flow from client to server endpoint + workers.emplace_back([&worker]() { worker(true); }); + // worker[1] simulates a flow from server to client endpoint + workers.emplace_back([&worker]() { worker(false); }); + workers[0].join(); + workers[1].join(); + }); + } + for (auto& t : threads) { + t.join(); + } + server_endpoint.reset(); + listener.reset(); + WaitForSingleOwner(std::move(test_ee)); +} + +// TODO(vigneshbabu): Add more tests which create listeners bound to a mix +// Ipv6 and other type of addresses (UDS) in the same test.