From 7d36dcfc43095791dffeea219fc5aa12b6a021f3 Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Thu, 3 Nov 2022 16:54:41 +0000 Subject: [PATCH 1/4] A basic set of EventEngine listener tests --- test/core/event_engine/test_suite/BUILD | 1 + .../event_engine/test_suite/server_test.cc | 257 +++++++++++++++++- 2 files changed, 255 insertions(+), 3 deletions(-) diff --git a/test/core/event_engine/test_suite/BUILD b/test/core/event_engine/test_suite/BUILD index 51b1ccaadbcb1..8c33a9018a877 100644 --- a/test/core/event_engine/test_suite/BUILD +++ b/test/core/event_engine/test_suite/BUILD @@ -153,6 +153,7 @@ grpc_cc_test( deps = [ ":client", ":oracle_event_engine_posix", + ":server", "//:grpc", "//test/core/util:grpc_test_util", ], diff --git a/test/core/event_engine/test_suite/server_test.cc b/test/core/event_engine/test_suite/server_test.cc index 7f58ffbdc3cb1..a2b2fe2e0c1c4 100644 --- a/test/core/event_engine/test_suite/server_test.cc +++ b/test/core/event_engine/test_suite/server_test.cc @@ -12,12 +12,263 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/strings/str_cat.h" +#include "absl/time/clock.h" +#include "absl/time/time.h" +#include "gtest/gtest.h" + +#include +#include +#include +#include + +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/event_engine/channel_args_endpoint_config.h" +#include "src/core/lib/gprpp/notification.h" #include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/resource_quota/memory_quota.h" +#include "src/core/lib/resource_quota/resource_quota.h" #include "test/core/event_engine/test_suite/event_engine_test.h" +#include "test/core/event_engine/test_suite/event_engine_test_utils.h" +#include "test/core/util/port.h" class EventEngineServerTest : public EventEngineTest {}; -// TODO(hork): establish meaningful tests -TEST_F(EventEngineServerTest, TODO) { grpc_core::ExecCtx exec_ctx; } +using namespace std::chrono_literals; + +namespace { + +using ::grpc_event_engine::experimental::ChannelArgsEndpointConfig; +using ::grpc_event_engine::experimental::EventEngine; +using ::grpc_event_engine::experimental::URIToResolvedAddress; +using Endpoint = ::grpc_event_engine::experimental::EventEngine::Endpoint; +using Listener = ::grpc_event_engine::experimental::EventEngine::Listener; +using ::grpc_event_engine::experimental::GetNextSendMessage; +using ::grpc_event_engine::experimental::WaitForSingleOwner; + +constexpr int kNumExchangedMessages = 100; + +} // namespace + +// Create a connection using the oracle EventEngine to a listener created +// by the Test EventEngine and exchange bi-di data over the connection. +// For each data transfer, verify that data written at one end of the stream +// equals data read at the other end of the stream. +TEST_F(EventEngineServerTest, ServerConnectExchangeBidiDataTransferTest) { + grpc_core::ExecCtx ctx; + auto oracle_ee = this->NewOracleEventEngine(); + std::shared_ptr test_ee(this->NewEventEngine()); + auto memory_quota = std::make_unique("bar"); + std::string target_addr = absl::StrCat( + "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); + std::unique_ptr client_endpoint; + std::unique_ptr server_endpoint; + grpc_core::Notification client_signal; + grpc_core::Notification server_signal; + + Listener::AcceptCallback accept_cb = + [&server_endpoint, &server_signal]( + std::unique_ptr ep, + grpc_core::MemoryAllocator /*memory_allocator*/) { + server_endpoint = std::move(ep); + server_signal.Notify(); + }; + + grpc_core::ChannelArgs args; + auto quota = grpc_core::ResourceQuota::Default(); + args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); + ChannelArgsEndpointConfig config(args); + auto status = test_ee->CreateListener( + std::move(accept_cb), [](absl::Status /*status*/) {}, config, + std::make_unique("foo")); + EXPECT_TRUE(status.ok()); + + std::unique_ptr listener = std::move(*status); + EXPECT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); + EXPECT_TRUE(listener->Start().ok()); + + oracle_ee->Connect( + [&client_endpoint, + &client_signal](absl::StatusOr> status) { + if (!status.ok()) { + gpr_log(GPR_ERROR, "Connect failed: %s", + status.status().ToString().c_str()); + client_endpoint = nullptr; + } else { + client_endpoint = std::move(*status); + } + client_signal.Notify(); + }, + URIToResolvedAddress(target_addr), config, + memory_quota->CreateMemoryAllocator("conn-1"), 24h); + + client_signal.WaitForNotification(); + server_signal.WaitForNotification(); + EXPECT_NE(client_endpoint.get(), nullptr); + EXPECT_NE(server_endpoint.get(), nullptr); + + // Alternate message exchanges between client -- server and server -- + // client. + for (int i = 0; i < kNumExchangedMessages; i++) { + // Send from client to server and verify data read at the server. + EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint.get(), + server_endpoint.get()) + .ok()); + + // Send from server to client and verify data read at the client. + EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), server_endpoint.get(), + client_endpoint.get()) + .ok()); + } + client_endpoint.reset(); + server_endpoint.reset(); + listener.reset(); + WaitForSingleOwner(std::move(test_ee)); +} + +// Create 1 listener bound to N IPv6 addresses and M connections where M > N and +// exchange and verify random number of messages over each connection. +TEST_F(EventEngineServerTest, + ServerMultipleIPv6ConnectionsToOneOracleListenerTest) { + grpc_core::ExecCtx ctx; + static constexpr int kNumListenerAddresses = 10; // N + static constexpr int kNumConnections = 10; // M + auto oracle_ee = this->NewOracleEventEngine(); + std::shared_ptr test_ee(this->NewEventEngine()); + auto memory_quota = std::make_unique("bar"); + std::unique_ptr server_endpoint; + // Notifications can only be fired once, so they are newed every loop + grpc_core::Notification* server_signal = new grpc_core::Notification(); + std::vector target_addrs; + std::vector, std::unique_ptr>> + connections; + + Listener::AcceptCallback accept_cb = + [&server_endpoint, &server_signal]( + std::unique_ptr ep, + grpc_core::MemoryAllocator /*memory_allocator*/) { + server_endpoint = std::move(ep); + server_signal->Notify(); + }; + grpc_core::ChannelArgs args; + auto quota = grpc_core::ResourceQuota::Default(); + args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); + ChannelArgsEndpointConfig config(args); + auto status = test_ee->CreateListener( + std::move(accept_cb), [](absl::Status /*status*/) {}, config, + std::make_unique("foo")); + EXPECT_TRUE(status.ok()); + std::unique_ptr listener = std::move(*status); + + target_addrs.reserve(kNumListenerAddresses); + for (int i = 0; i < kNumListenerAddresses; i++) { + std::string target_addr = absl::StrCat( + "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); + EXPECT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); + target_addrs.push_back(target_addr); + } + EXPECT_TRUE(listener->Start().ok()); + absl::SleepFor(absl::Milliseconds(500)); + for (int i = 0; i < kNumConnections; i++) { + std::unique_ptr client_endpoint; + grpc_core::Notification client_signal; + // Create an oracle EventEngine client and connect to a one of the + // addresses bound to the test EventEngine listener. Verify that the + // connection succeeds. + grpc_core::ChannelArgs client_args; + auto client_quota = grpc_core::ResourceQuota::Default(); + client_args = client_args.Set(GRPC_ARG_RESOURCE_QUOTA, client_quota); + ChannelArgsEndpointConfig client_config(client_args); + oracle_ee->Connect( + [&client_endpoint, + &client_signal](absl::StatusOr> status) { + if (!status.ok()) { + gpr_log(GPR_ERROR, "Connect failed: %s", + status.status().ToString().c_str()); + client_endpoint = nullptr; + } else { + client_endpoint = std::move(*status); + } + client_signal.Notify(); + }, + URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]), + client_config, + memory_quota->CreateMemoryAllocator( + absl::StrCat("conn-", std::to_string(i))), + 24h); + + client_signal.WaitForNotification(); + server_signal->WaitForNotification(); + EXPECT_NE(client_endpoint.get(), nullptr); + EXPECT_NE(server_endpoint.get(), nullptr); + connections.push_back(std::make_tuple(std::move(client_endpoint), + std::move(server_endpoint))); + delete server_signal; + server_signal = new grpc_core::Notification(); + } + delete server_signal; + + std::vector threads; + // Create one thread for each connection. For each connection, create + // 2 more worker threads: to exchange and verify bi-directional data + // transfer. + threads.reserve(kNumConnections); + for (int i = 0; i < kNumConnections; i++) { + // For each connection, simulate a parallel bi-directional data transfer. + // All bi-directional transfers are run in parallel across all + // connections. Each bi-directional data transfer uses a random number of + // messages. + threads.emplace_back([client_endpoint = + std::move(std::get<0>(connections[i])), + server_endpoint = + std::move(std::get<1>(connections[i]))]() { + std::vector workers; + workers.reserve(2); + auto worker = [client_endpoint = client_endpoint.get(), + server_endpoint = + server_endpoint.get()](bool client_to_server) { + grpc_core::ExecCtx ctx; + for (int i = 0; i < kNumExchangedMessages; i++) { + // If client_to_server is true, send from client to server and + // verify data read at the server. Otherwise send data from server + // to client and verify data read at client. + if (client_to_server) { + EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), + client_endpoint, server_endpoint) + .ok()); + } else { + EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), + server_endpoint, client_endpoint) + .ok()); + } + } + }; + // worker[0] simulates a flow from client to server endpoint + workers.emplace_back([&worker]() { worker(true); }); + // worker[1] simulates a flow from server to client endpoint + workers.emplace_back([&worker]() { worker(false); }); + workers[0].join(); + workers[1].join(); + }); + } + for (auto& t : threads) { + t.join(); + } + server_endpoint.reset(); + listener.reset(); + WaitForSingleOwner(std::move(test_ee)); +} + +// TODO(vigneshbabu): Add more tests which create listeners bound to a mix +// Ipv6 and other type of addresses (UDS) in the same test. From 97aa0f7964b29fcea8145f20e25c11a3274b6edf Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Sat, 5 Nov 2022 20:14:18 +0000 Subject: [PATCH 2/4] review comments --- CMakeLists.txt | 1 + build_autogenerated.yaml | 1 + .../event_engine/test_suite/client_test.cc | 46 +++++++----------- .../event_engine/test_suite/server_test.cc | 48 +++++++------------ 4 files changed, 37 insertions(+), 59 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 55226f9dccfb2..f41beeb3c3556 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14095,6 +14095,7 @@ if(_gRPC_PLATFORM_LINUX OR _gRPC_PLATFORM_MAC OR _gRPC_PLATFORM_POSIX) test/core/event_engine/test_suite/event_engine_test_utils.cc test/core/event_engine/test_suite/oracle_event_engine_posix.cc test/core/event_engine/test_suite/oracle_event_engine_posix_test.cc + test/core/event_engine/test_suite/server_test.cc third_party/googletest/googletest/src/gtest-all.cc third_party/googletest/googlemock/src/gmock-all.cc ) diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml index cc8ef3c222f54..3a86049834b22 100644 --- a/build_autogenerated.yaml +++ b/build_autogenerated.yaml @@ -8236,6 +8236,7 @@ targets: - test/core/event_engine/test_suite/event_engine_test_utils.cc - test/core/event_engine/test_suite/oracle_event_engine_posix.cc - test/core/event_engine/test_suite/oracle_event_engine_posix_test.cc + - test/core/event_engine/test_suite/server_test.cc deps: - grpc_test_util platforms: diff --git a/test/core/event_engine/test_suite/client_test.cc b/test/core/event_engine/test_suite/client_test.cc index 34ac4f05570ae..6b01ae1f98c4d 100644 --- a/test/core/event_engine/test_suite/client_test.cc +++ b/test/core/event_engine/test_suite/client_test.cc @@ -76,7 +76,7 @@ TEST_F(EventEngineClientTest, ConnectToNonExistentListenerTest) { test_ee->Connect( [&signal](absl::StatusOr> status) { // Connect should fail. - EXPECT_FALSE(status.ok()); + ASSERT_FALSE(status.ok()); signal.Notify(); }, URIToResolvedAddress(target_addr), config, @@ -114,26 +114,20 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) { auto quota = grpc_core::ResourceQuota::Default(); args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); ChannelArgsEndpointConfig config(args); - auto status = oracle_ee->CreateListener( + auto listener = oracle_ee->CreateListener( std::move(accept_cb), [](absl::Status status) { GPR_ASSERT(status.ok()); }, config, std::make_unique("foo")); - EXPECT_TRUE(status.ok()); + ASSERT_TRUE(listener.ok()); - std::unique_ptr listener = std::move(*status); - EXPECT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); - EXPECT_TRUE(listener->Start().ok()); + EXPECT_TRUE((*listener)->Bind(URIToResolvedAddress(target_addr)).ok()); + EXPECT_TRUE((*listener)->Start().ok()); test_ee->Connect( [&client_endpoint, &client_signal](absl::StatusOr> status) { - if (!status.ok()) { - gpr_log(GPR_ERROR, "Connect failed: %s", - status.status().ToString().c_str()); - client_endpoint = nullptr; - } else { - client_endpoint = std::move(*status); - } + ASSERT_TRUE(status.ok()); + client_endpoint = std::move(*status); client_signal.Notify(); }, URIToResolvedAddress(target_addr), config, @@ -141,19 +135,19 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) { client_signal.WaitForNotification(); server_signal.WaitForNotification(); - EXPECT_NE(client_endpoint.get(), nullptr); - EXPECT_NE(server_endpoint.get(), nullptr); + ASSERT_NE(client_endpoint.get(), nullptr); + ASSERT_NE(server_endpoint.get(), nullptr); // Alternate message exchanges between client -- server and server -- // client. for (int i = 0; i < kNumExchangedMessages; i++) { // Send from client to server and verify data read at the server. - EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint.get(), + ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint.get(), server_endpoint.get()) .ok()); // Send from server to client and verify data read at the client. - EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), server_endpoint.get(), + ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), server_endpoint.get(), client_endpoint.get()) .ok()); } @@ -189,21 +183,20 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { auto quota = grpc_core::ResourceQuota::Default(); args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); ChannelArgsEndpointConfig config(args); - auto status = oracle_ee->CreateListener( + auto listener = oracle_ee->CreateListener( std::move(accept_cb), [](absl::Status status) { GPR_ASSERT(status.ok()); }, config, std::make_unique("foo")); - EXPECT_TRUE(status.ok()); - std::unique_ptr listener = std::move(*status); + ASSERT_TRUE(listener.ok()); target_addrs.reserve(kNumListenerAddresses); for (int i = 0; i < kNumListenerAddresses; i++) { std::string target_addr = absl::StrCat( "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); - EXPECT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); + EXPECT_TRUE((*listener)->Bind(URIToResolvedAddress(target_addr)).ok()); target_addrs.push_back(target_addr); } - EXPECT_TRUE(listener->Start().ok()); + EXPECT_TRUE((*listener)->Start().ok()); absl::SleepFor(absl::Milliseconds(500)); for (int i = 0; i < kNumConnections; i++) { std::unique_ptr client_endpoint; @@ -218,13 +211,8 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { test_ee->Connect( [&client_endpoint, &client_signal](absl::StatusOr> status) { - if (!status.ok()) { - gpr_log(GPR_ERROR, "Connect failed: %s", - status.status().ToString().c_str()); - client_endpoint = nullptr; - } else { - client_endpoint = std::move(*status); - } + ASSERT_TRUE(status.ok()); + client_endpoint = std::move(*status); client_signal.Notify(); }, URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]), diff --git a/test/core/event_engine/test_suite/server_test.cc b/test/core/event_engine/test_suite/server_test.cc index a2b2fe2e0c1c4..554c9acf1e6f4 100644 --- a/test/core/event_engine/test_suite/server_test.cc +++ b/test/core/event_engine/test_suite/server_test.cc @@ -89,25 +89,19 @@ TEST_F(EventEngineServerTest, ServerConnectExchangeBidiDataTransferTest) { auto quota = grpc_core::ResourceQuota::Default(); args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); ChannelArgsEndpointConfig config(args); - auto status = test_ee->CreateListener( + auto listener = test_ee->CreateListener( std::move(accept_cb), [](absl::Status /*status*/) {}, config, std::make_unique("foo")); - EXPECT_TRUE(status.ok()); + EXPECT_TRUE(listener.ok()); - std::unique_ptr listener = std::move(*status); - EXPECT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); - EXPECT_TRUE(listener->Start().ok()); + EXPECT_TRUE((*listener)->Bind(URIToResolvedAddress(target_addr)).ok()); + EXPECT_TRUE((*listener)->Start().ok()); oracle_ee->Connect( [&client_endpoint, &client_signal](absl::StatusOr> status) { - if (!status.ok()) { - gpr_log(GPR_ERROR, "Connect failed: %s", - status.status().ToString().c_str()); - client_endpoint = nullptr; - } else { - client_endpoint = std::move(*status); - } + ASSERT_TRUE(status.ok()); + client_endpoint = std::move(*status); client_signal.Notify(); }, URIToResolvedAddress(target_addr), config, @@ -133,7 +127,7 @@ TEST_F(EventEngineServerTest, ServerConnectExchangeBidiDataTransferTest) { } client_endpoint.reset(); server_endpoint.reset(); - listener.reset(); + (*listener).reset(); WaitForSingleOwner(std::move(test_ee)); } @@ -165,20 +159,19 @@ TEST_F(EventEngineServerTest, auto quota = grpc_core::ResourceQuota::Default(); args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); ChannelArgsEndpointConfig config(args); - auto status = test_ee->CreateListener( + auto listener = test_ee->CreateListener( std::move(accept_cb), [](absl::Status /*status*/) {}, config, std::make_unique("foo")); - EXPECT_TRUE(status.ok()); - std::unique_ptr listener = std::move(*status); + ASSERT_TRUE(listener.ok()); target_addrs.reserve(kNumListenerAddresses); for (int i = 0; i < kNumListenerAddresses; i++) { std::string target_addr = absl::StrCat( "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); - EXPECT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); + ASSERT_TRUE((*listener)->Bind(URIToResolvedAddress(target_addr)).ok()); target_addrs.push_back(target_addr); } - EXPECT_TRUE(listener->Start().ok()); + ASSERT_TRUE((*listener)->Start().ok()); absl::SleepFor(absl::Milliseconds(500)); for (int i = 0; i < kNumConnections; i++) { std::unique_ptr client_endpoint; @@ -193,13 +186,8 @@ TEST_F(EventEngineServerTest, oracle_ee->Connect( [&client_endpoint, &client_signal](absl::StatusOr> status) { - if (!status.ok()) { - gpr_log(GPR_ERROR, "Connect failed: %s", - status.status().ToString().c_str()); - client_endpoint = nullptr; - } else { - client_endpoint = std::move(*status); - } + ASSERT_TRUE(status.ok()); + client_endpoint = std::move(*status); client_signal.Notify(); }, URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]), @@ -210,8 +198,8 @@ TEST_F(EventEngineServerTest, client_signal.WaitForNotification(); server_signal->WaitForNotification(); - EXPECT_NE(client_endpoint.get(), nullptr); - EXPECT_NE(server_endpoint.get(), nullptr); + ASSERT_NE(client_endpoint.get(), nullptr); + ASSERT_NE(server_endpoint.get(), nullptr); connections.push_back(std::make_tuple(std::move(client_endpoint), std::move(server_endpoint))); delete server_signal; @@ -244,11 +232,11 @@ TEST_F(EventEngineServerTest, // verify data read at the server. Otherwise send data from server // to client and verify data read at client. if (client_to_server) { - EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), + ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint, server_endpoint) .ok()); } else { - EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), + ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), server_endpoint, client_endpoint) .ok()); } @@ -266,7 +254,7 @@ TEST_F(EventEngineServerTest, t.join(); } server_endpoint.reset(); - listener.reset(); + (*listener).reset(); WaitForSingleOwner(std::move(test_ee)); } From 6c2d8a98497a1a427e687a40872f4c6d0e6c0692 Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Sat, 5 Nov 2022 20:18:31 +0000 Subject: [PATCH 3/4] replace expect with asserts --- test/core/event_engine/test_suite/client_test.cc | 16 ++++++++-------- test/core/event_engine/test_suite/server_test.cc | 14 +++++++------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/test/core/event_engine/test_suite/client_test.cc b/test/core/event_engine/test_suite/client_test.cc index 6b01ae1f98c4d..b2f218a9bcece 100644 --- a/test/core/event_engine/test_suite/client_test.cc +++ b/test/core/event_engine/test_suite/client_test.cc @@ -120,8 +120,8 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) { std::make_unique("foo")); ASSERT_TRUE(listener.ok()); - EXPECT_TRUE((*listener)->Bind(URIToResolvedAddress(target_addr)).ok()); - EXPECT_TRUE((*listener)->Start().ok()); + ASSERT_TRUE((*listener)->Bind(URIToResolvedAddress(target_addr)).ok()); + ASSERT_TRUE((*listener)->Start().ok()); test_ee->Connect( [&client_endpoint, @@ -193,10 +193,10 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { for (int i = 0; i < kNumListenerAddresses; i++) { std::string target_addr = absl::StrCat( "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); - EXPECT_TRUE((*listener)->Bind(URIToResolvedAddress(target_addr)).ok()); + ASSERT_TRUE((*listener)->Bind(URIToResolvedAddress(target_addr)).ok()); target_addrs.push_back(target_addr); } - EXPECT_TRUE((*listener)->Start().ok()); + ASSERT_TRUE((*listener)->Start().ok()); absl::SleepFor(absl::Milliseconds(500)); for (int i = 0; i < kNumConnections; i++) { std::unique_ptr client_endpoint; @@ -223,8 +223,8 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { client_signal.WaitForNotification(); server_signal->WaitForNotification(); - EXPECT_NE(client_endpoint.get(), nullptr); - EXPECT_NE(server_endpoint.get(), nullptr); + ASSERT_NE(client_endpoint.get(), nullptr); + ASSERT_NE(server_endpoint.get(), nullptr); connections.push_back(std::make_tuple(std::move(client_endpoint), std::move(server_endpoint))); delete server_signal; @@ -257,11 +257,11 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { // verify data read at the server. Otherwise send data from server // to client and verify data read at client. if (client_to_server) { - EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), + ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint, server_endpoint) .ok()); } else { - EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), + ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), server_endpoint, client_endpoint) .ok()); } diff --git a/test/core/event_engine/test_suite/server_test.cc b/test/core/event_engine/test_suite/server_test.cc index 554c9acf1e6f4..a42e93ec837f0 100644 --- a/test/core/event_engine/test_suite/server_test.cc +++ b/test/core/event_engine/test_suite/server_test.cc @@ -92,10 +92,10 @@ TEST_F(EventEngineServerTest, ServerConnectExchangeBidiDataTransferTest) { auto listener = test_ee->CreateListener( std::move(accept_cb), [](absl::Status /*status*/) {}, config, std::make_unique("foo")); - EXPECT_TRUE(listener.ok()); + ASSERT_TRUE(listener.ok()); - EXPECT_TRUE((*listener)->Bind(URIToResolvedAddress(target_addr)).ok()); - EXPECT_TRUE((*listener)->Start().ok()); + ASSERT_TRUE((*listener)->Bind(URIToResolvedAddress(target_addr)).ok()); + ASSERT_TRUE((*listener)->Start().ok()); oracle_ee->Connect( [&client_endpoint, @@ -109,19 +109,19 @@ TEST_F(EventEngineServerTest, ServerConnectExchangeBidiDataTransferTest) { client_signal.WaitForNotification(); server_signal.WaitForNotification(); - EXPECT_NE(client_endpoint.get(), nullptr); - EXPECT_NE(server_endpoint.get(), nullptr); + ASSERT_NE(client_endpoint.get(), nullptr); + ASSERT_NE(server_endpoint.get(), nullptr); // Alternate message exchanges between client -- server and server -- // client. for (int i = 0; i < kNumExchangedMessages; i++) { // Send from client to server and verify data read at the server. - EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint.get(), + ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), client_endpoint.get(), server_endpoint.get()) .ok()); // Send from server to client and verify data read at the client. - EXPECT_TRUE(SendValidatePayload(GetNextSendMessage(), server_endpoint.get(), + ASSERT_TRUE(SendValidatePayload(GetNextSendMessage(), server_endpoint.get(), client_endpoint.get()) .ok()); } From a64229983a4c5d5db99b3c7aa88bbbd9b3b2542e Mon Sep 17 00:00:00 2001 From: Vignesh Babu Date: Wed, 9 Nov 2022 01:26:08 +0000 Subject: [PATCH 4/4] review comments --- .../event_engine/test_suite/client_test.cc | 26 ++++++++-------- .../event_engine/test_suite/server_test.cc | 30 +++++++++---------- 2 files changed, 26 insertions(+), 30 deletions(-) diff --git a/test/core/event_engine/test_suite/client_test.cc b/test/core/event_engine/test_suite/client_test.cc index b2f218a9bcece..c894ff2c6deca 100644 --- a/test/core/event_engine/test_suite/client_test.cc +++ b/test/core/event_engine/test_suite/client_test.cc @@ -114,20 +114,19 @@ TEST_F(EventEngineClientTest, ConnectExchangeBidiDataTransferTest) { auto quota = grpc_core::ResourceQuota::Default(); args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); ChannelArgsEndpointConfig config(args); - auto listener = oracle_ee->CreateListener( + auto listener = *oracle_ee->CreateListener( std::move(accept_cb), [](absl::Status status) { GPR_ASSERT(status.ok()); }, config, std::make_unique("foo")); - ASSERT_TRUE(listener.ok()); - ASSERT_TRUE((*listener)->Bind(URIToResolvedAddress(target_addr)).ok()); - ASSERT_TRUE((*listener)->Start().ok()); + ASSERT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); + ASSERT_TRUE(listener->Start().ok()); test_ee->Connect( [&client_endpoint, - &client_signal](absl::StatusOr> status) { - ASSERT_TRUE(status.ok()); - client_endpoint = std::move(*status); + &client_signal](absl::StatusOr> endpoint) { + ASSERT_TRUE(endpoint.ok()); + client_endpoint = std::move(*endpoint); client_signal.Notify(); }, URIToResolvedAddress(target_addr), config, @@ -183,20 +182,19 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { auto quota = grpc_core::ResourceQuota::Default(); args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); ChannelArgsEndpointConfig config(args); - auto listener = oracle_ee->CreateListener( + auto listener = *oracle_ee->CreateListener( std::move(accept_cb), [](absl::Status status) { GPR_ASSERT(status.ok()); }, config, std::make_unique("foo")); - ASSERT_TRUE(listener.ok()); target_addrs.reserve(kNumListenerAddresses); for (int i = 0; i < kNumListenerAddresses; i++) { std::string target_addr = absl::StrCat( "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); - ASSERT_TRUE((*listener)->Bind(URIToResolvedAddress(target_addr)).ok()); + ASSERT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); target_addrs.push_back(target_addr); } - ASSERT_TRUE((*listener)->Start().ok()); + ASSERT_TRUE(listener->Start().ok()); absl::SleepFor(absl::Milliseconds(500)); for (int i = 0; i < kNumConnections; i++) { std::unique_ptr client_endpoint; @@ -210,9 +208,9 @@ TEST_F(EventEngineClientTest, MultipleIPv6ConnectionsToOneOracleListenerTest) { ChannelArgsEndpointConfig client_config(client_args); test_ee->Connect( [&client_endpoint, - &client_signal](absl::StatusOr> status) { - ASSERT_TRUE(status.ok()); - client_endpoint = std::move(*status); + &client_signal](absl::StatusOr> endpoint) { + ASSERT_TRUE(endpoint.ok()); + client_endpoint = std::move(*endpoint); client_signal.Notify(); }, URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]), diff --git a/test/core/event_engine/test_suite/server_test.cc b/test/core/event_engine/test_suite/server_test.cc index a42e93ec837f0..55627593fdbb6 100644 --- a/test/core/event_engine/test_suite/server_test.cc +++ b/test/core/event_engine/test_suite/server_test.cc @@ -89,19 +89,18 @@ TEST_F(EventEngineServerTest, ServerConnectExchangeBidiDataTransferTest) { auto quota = grpc_core::ResourceQuota::Default(); args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); ChannelArgsEndpointConfig config(args); - auto listener = test_ee->CreateListener( + auto listener = *test_ee->CreateListener( std::move(accept_cb), [](absl::Status /*status*/) {}, config, std::make_unique("foo")); - ASSERT_TRUE(listener.ok()); - ASSERT_TRUE((*listener)->Bind(URIToResolvedAddress(target_addr)).ok()); - ASSERT_TRUE((*listener)->Start().ok()); + ASSERT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); + ASSERT_TRUE(listener->Start().ok()); oracle_ee->Connect( [&client_endpoint, - &client_signal](absl::StatusOr> status) { - ASSERT_TRUE(status.ok()); - client_endpoint = std::move(*status); + &client_signal](absl::StatusOr> endpoint) { + ASSERT_TRUE(endpoint.ok()); + client_endpoint = std::move(*endpoint); client_signal.Notify(); }, URIToResolvedAddress(target_addr), config, @@ -127,7 +126,7 @@ TEST_F(EventEngineServerTest, ServerConnectExchangeBidiDataTransferTest) { } client_endpoint.reset(); server_endpoint.reset(); - (*listener).reset(); + listener.reset(); WaitForSingleOwner(std::move(test_ee)); } @@ -159,19 +158,18 @@ TEST_F(EventEngineServerTest, auto quota = grpc_core::ResourceQuota::Default(); args = args.Set(GRPC_ARG_RESOURCE_QUOTA, quota); ChannelArgsEndpointConfig config(args); - auto listener = test_ee->CreateListener( + auto listener = *test_ee->CreateListener( std::move(accept_cb), [](absl::Status /*status*/) {}, config, std::make_unique("foo")); - ASSERT_TRUE(listener.ok()); target_addrs.reserve(kNumListenerAddresses); for (int i = 0; i < kNumListenerAddresses; i++) { std::string target_addr = absl::StrCat( "ipv6:[::1]:", std::to_string(grpc_pick_unused_port_or_die())); - ASSERT_TRUE((*listener)->Bind(URIToResolvedAddress(target_addr)).ok()); + ASSERT_TRUE(listener->Bind(URIToResolvedAddress(target_addr)).ok()); target_addrs.push_back(target_addr); } - ASSERT_TRUE((*listener)->Start().ok()); + ASSERT_TRUE(listener->Start().ok()); absl::SleepFor(absl::Milliseconds(500)); for (int i = 0; i < kNumConnections; i++) { std::unique_ptr client_endpoint; @@ -185,9 +183,9 @@ TEST_F(EventEngineServerTest, ChannelArgsEndpointConfig client_config(client_args); oracle_ee->Connect( [&client_endpoint, - &client_signal](absl::StatusOr> status) { - ASSERT_TRUE(status.ok()); - client_endpoint = std::move(*status); + &client_signal](absl::StatusOr> endpoint) { + ASSERT_TRUE(endpoint.ok()); + client_endpoint = std::move(*endpoint); client_signal.Notify(); }, URIToResolvedAddress(target_addrs[i % kNumListenerAddresses]), @@ -254,7 +252,7 @@ TEST_F(EventEngineServerTest, t.join(); } server_endpoint.reset(); - (*listener).reset(); + listener.reset(); WaitForSingleOwner(std::move(test_ee)); }