From 521086d56b6fe42a87c5d364c2562cfa51152bf6 Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Mon, 17 Oct 2022 22:52:44 -0700 Subject: [PATCH] Make federated client more robust (#8351) --- plugin/federated/federated_client.h | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/plugin/federated/federated_client.h b/plugin/federated/federated_client.h index 592ab75edce8..2b4637339199 100644 --- a/plugin/federated/federated_client.h +++ b/plugin/federated/federated_client.h @@ -28,8 +28,11 @@ class FederatedClient { options.pem_cert_chain = client_cert; grpc::ChannelArguments args; args.SetMaxReceiveMessageSize(std::numeric_limits::max()); - return Federated::NewStub( - grpc::CreateCustomChannel(server_address, grpc::SslCredentials(options), args)); + auto channel = + grpc::CreateCustomChannel(server_address, grpc::SslCredentials(options), args); + channel->WaitForConnected( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(60, GPR_TIMESPAN))); + return Federated::NewStub(channel); }()}, rank_{rank} {} @@ -51,6 +54,7 @@ class FederatedClient { AllgatherReply reply; grpc::ClientContext context; + context.set_wait_for_ready(true); grpc::Status status = stub_->Allgather(&context, request, &reply); if (status.ok()) { @@ -72,6 +76,7 @@ class FederatedClient { AllreduceReply reply; grpc::ClientContext context; + context.set_wait_for_ready(true); grpc::Status status = stub_->Allreduce(&context, request, &reply); if (status.ok()) { @@ -91,6 +96,7 @@ class FederatedClient { BroadcastReply reply; grpc::ClientContext context; + context.set_wait_for_ready(true); grpc::Status status = stub_->Broadcast(&context, request, &reply); if (status.ok()) {