Skip to content

Commit

Permalink
Merge branch 'PaddlePaddle:develop' into corrcoef
Browse files Browse the repository at this point in the history
  • Loading branch information
liqitong-a committed Apr 29, 2022
2 parents 4fca073 + 32cae24 commit 51da5d6
Show file tree
Hide file tree
Showing 138 changed files with 3,764 additions and 1,693 deletions.
10 changes: 6 additions & 4 deletions paddle/fluid/distributed/collective/reducer.cc
Expand Up @@ -447,10 +447,12 @@ void EagerReducer::TraverseBackwardGraph(const std::vector<Tensor> &outputs) {
while (!queue.empty()) {
egr::GradNodeBase *node = queue.front();
queue.pop();
const std::vector<std::vector<egr::Edge>> &edges = node->GetEdges();
for (size_t i = 0; i < edges.size(); i++) {
for (size_t j = 0; j < edges[i].size(); j++) {
const egr::Edge &edge = edges[i][j];
const paddle::small_vector<std::vector<egr::GradSlotMeta>,
egr::kSlotSmallVectorSize> &metas =
node->OutputMeta();
for (size_t i = 0; i < metas.size(); i++) {
for (size_t j = 0; j < metas[i].size(); j++) {
const egr::Edge &edge = metas[i][j].GetEdge();
auto next_node_shared = edge.GetMutableGradNode();
if (!next_node_shared || !next_node_shared.get()) {
continue;
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/distributed/ps/service/heter_client.cc
Expand Up @@ -23,6 +23,8 @@ DEFINE_int32(switch_send_recv_timeout_s, 600, "switch_send_recv_timeout_s");
namespace paddle {
namespace distributed {
std::shared_ptr<HeterClient> HeterClient::s_instance_ = nullptr;
std::mutex HeterClient::mtx_;
std::shared_ptr<HeterClient> HeterClient::switch_s_instance_ = nullptr;

int GetMicroId(const platform::DeviceContext& ctx,
const framework::Scope* scope) {
Expand Down
24 changes: 16 additions & 8 deletions paddle/fluid/distributed/ps/service/heter_client.h
Expand Up @@ -169,16 +169,22 @@ class HeterClient {
}

// switch client singleton
static HeterClient& GetSwitchInstance(
static std::shared_ptr<HeterClient> GetSwitchInstance(
const std::vector<std::string>& peer_endpoints, int32_t peer_role) {
static HeterClient switch_s_instance_;
if (peer_endpoints.empty()) {
VLOG(4) << "init switch client failed, null peer_endpoints";
if (switch_s_instance_ == nullptr) {
std::unique_lock<std::mutex> lock(mtx_);
if (peer_endpoints.empty()) {
VLOG(4) << "init switch client failed, null peer_endpoints";
}
VLOG(4) << "peer role is: " << peer_role
<< ", addr is: " << peer_endpoints[0];
if (switch_s_instance_ == nullptr) {
switch_s_instance_.reset(new HeterClient());
switch_s_instance_->SetPeerSwitchList(peer_endpoints);
switch_s_instance_->InitClientChannels(false, peer_endpoints,
peer_role);
}
}
VLOG(4) << "peer role is: " << peer_role
<< ", addr is: " << peer_endpoints[0];
switch_s_instance_.SetPeerSwitchList(peer_endpoints);
switch_s_instance_.InitClientChannels(false, peer_endpoints, peer_role);
return switch_s_instance_;
}

Expand Down Expand Up @@ -230,6 +236,8 @@ class HeterClient {
HeterClient(const HeterClient&);

static std::shared_ptr<HeterClient> s_instance_;
static std::mutex mtx_;
static std::shared_ptr<HeterClient> switch_s_instance_;
std::vector<std::shared_ptr<brpc::Channel>> xpu_channels_;
std::vector<std::shared_ptr<brpc::Channel>> previous_xpu_channels_;

Expand Down
37 changes: 24 additions & 13 deletions paddle/fluid/distributed/ps/service/heter_server.h 100644 → 100755
Expand Up @@ -144,31 +144,41 @@ class SendAndRecvVariableHandler final : public ServiceHandlerBase {
brpc::Controller* cntl);

void WaitForVarsConsumed(int32_t group_id, const std::string& var_name) {
timeline_.Start();
// timeline_.Start();
while (true) {
if (vars_ready_flag[group_id][var_name] == 0) {
break;
{
std::lock_guard<std::mutex> lock(scope_mutex_);
if (vars_ready_flag[group_id][var_name] == 0) {
break;
}
}
/*
timeline_.Pause();
if (timeline_.ElapsedSec() > FLAGS_switch_send_recv_timeout_s) {
VLOG(0) << "vars not consumed exceed 10 miniutes";
break;
}
*/
}
return;
}

void WaitForVarsProduced(int32_t group_id, const std::string& var_name) {
timeline_.Start();
// timeline_.Start();
while (true) {
if (vars_ready_flag[group_id][var_name] == 1) {
break;
{
std::lock_guard<std::mutex> lock(scope_mutex_);
if (vars_ready_flag[group_id][var_name] == 1) {
break;
}
}
/*
timeline_.Pause();
if (timeline_.ElapsedSec() > FLAGS_switch_send_recv_timeout_s) {
VLOG(0) << "vars not produced exceed 10 miniutes";
break;
}
*/
}
return;
}
Expand Down Expand Up @@ -379,12 +389,12 @@ class HeterService : public PsService {
::google::protobuf::Closure* done) {
VLOG(4) << "entering SendToSwitch";
brpc::ClosureGuard done_guard(done);
auto& switch_client_ptr_ =
std::shared_ptr<HeterClient> switch_client_ptr_ =
HeterClient::GetSwitchInstance(peer_endpoints_, PEER_ROLE_IS_SWITCH);
if (switch_client_ptr_.peer_switch_channels_.empty()) {
LOG(ERROR) << "switch_client_ptr_.peer_switch_channels_ null";
if (switch_client_ptr_->peer_switch_channels_.empty()) {
LOG(ERROR) << "switch_client_ptr_->peer_switch_channels_ null";
}
brpc::Channel* channel = switch_client_ptr_.peer_switch_channels_[0].get();
brpc::Channel* channel = switch_client_ptr_->peer_switch_channels_[0].get();
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
// proxy: 定义新的 OnHeterRpcDone 对象(或者在类 OnHeterRpcDone 中 reset)
OnHeterRpcDone* closure2 = new OnHeterRpcDone([](void* done) {
Expand Down Expand Up @@ -414,6 +424,7 @@ class HeterService : public PsService {
std_cntl.response_attachment().movable());
fut.wait();
VLOG(4) << "SendToSwitch done";
delete closure2;
}

void SendS2S(::google::protobuf::RpcController* controller,
Expand Down Expand Up @@ -446,11 +457,11 @@ class HeterService : public PsService {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
VLOG(4) << "SendToWorker(client addr) =" << cntl->remote_side();
auto& switch_client_ptr_ =
std::shared_ptr<distributed::HeterClient> switch_client_ptr_ =
HeterClient::GetSwitchInstance(peer_endpoints_, PEER_ROLE_IS_WORKER);
VLOG(4) << "in switch client, peer worker 0: "
<< switch_client_ptr_.peer_worker_list_[0];
brpc::Channel* channel = switch_client_ptr_.peer_worker_channels_[0].get();
<< switch_client_ptr_->peer_worker_list_[0];
brpc::Channel* channel = switch_client_ptr_->peer_worker_channels_[0].get();

auto* closure = reinterpret_cast<OnHeterRpcDone*>(done);
PsService_Stub stub(channel);
Expand Down
16 changes: 10 additions & 6 deletions paddle/fluid/eager/accumulation/accumulation_node.cc
Expand Up @@ -38,10 +38,13 @@ static void CopyOrAddTensor(paddle::experimental::Tensor* tensor,
}
}

std::vector<std::vector<paddle::experimental::Tensor>> GradNodeAccumulation::
operator()(
std::vector<std::vector<paddle::experimental::Tensor>>& grads, // NOLINT
bool create_graph, bool is_new_grad) {
paddle::small_vector<std::vector<paddle::experimental::Tensor>,
kSlotSmallVectorSize>
GradNodeAccumulation::operator()(
paddle::small_vector<std::vector<paddle::experimental::Tensor>,
kSlotSmallVectorSize>& grads, // NOLINT
bool create_graph,
bool is_new_grad) {
VLOG(3) << "Running Eager Backward Node: GradNodeAccumulation";
PADDLE_ENFORCE(grads.size() == 1,
paddle::platform::errors::Fatal(
Expand All @@ -56,8 +59,9 @@ operator()(
// Apply Gradient Hooks
paddle::experimental::Tensor grad_out;
if (GradientHooksRegistered()) {
std::vector<std::vector<paddle::experimental::Tensor>> hooked_grads =
ApplyGradientHooks(grads);
paddle::small_vector<std::vector<paddle::experimental::Tensor>,
kSlotSmallVectorSize>
hooked_grads = ApplyGradientHooks(grads);
grad_out = hooked_grads[0][0];
} else {
grad_out = grads[0][0];
Expand Down
9 changes: 6 additions & 3 deletions paddle/fluid/eager/accumulation/accumulation_node.h
Expand Up @@ -37,9 +37,12 @@ class GradNodeAccumulation : public GradNodeBase {
}

// Functor: perform backward computations
virtual std::vector<std::vector<paddle::experimental::Tensor>> operator()(
std::vector<std::vector<paddle::experimental::Tensor>>& grads, // NOLINT
bool create_graph = false, bool is_new_grad = false) override;
virtual paddle::small_vector<std::vector<paddle::experimental::Tensor>,
kSlotSmallVectorSize>
operator()(paddle::small_vector<std::vector<paddle::experimental::Tensor>,
kSlotSmallVectorSize>& grads, // NOLINT
bool create_graph = false,
bool is_new_grad = false) override;

void ClearTensorWrappers() override { VLOG(6) << "Do nothing here now"; }

Expand Down
8 changes: 4 additions & 4 deletions paddle/fluid/eager/amp_utils.h
Expand Up @@ -21,8 +21,8 @@ namespace egr {

static inline paddle::experimental::DataType GetPromoteType(
const std::string& op_name,
const std::vector<std::vector<paddle::experimental::Tensor>>&
amp_tensors_vector,
const paddle::small_vector<std::vector<paddle::experimental::Tensor>,
kSlotSmallVectorSize>& amp_tensors_vector,
const paddle::experimental::DataType& amp_dtype) {
auto dst_type = amp_dtype;
if (egr::Controller::Instance().GetCurrentTracer()->GetAmpDtype() ==
Expand Down Expand Up @@ -86,8 +86,8 @@ static inline paddle::experimental::DataType GetPromoteType(

inline paddle::experimental::DataType GetAmpDestDtype(
const std::string& op_name,
const std::vector<std::vector<paddle::experimental::Tensor>>&
amp_tensors_vector) {
const paddle::small_vector<std::vector<paddle::experimental::Tensor>,
kSlotSmallVectorSize>& amp_tensors_vector) {
auto amp_dtype =
egr::Controller::Instance().GetCurrentTracer()->GetAmpDtype();
auto amp_level = egr::Controller::Instance().GetAMPLevel();
Expand Down
Expand Up @@ -144,27 +144,34 @@ void GradNodeScale::SetTensorWrappers_X(

void GradNodeScale::SetAttributes_scale(float scale) { scale_ = scale; }

std::vector<std::vector<paddle::experimental::Tensor>> GradNodeScale::
operator()(
std::vector<std::vector<paddle::experimental::Tensor>>& grads, // NOLINT
bool create_graph, bool is_new_grad) {
paddle::small_vector<std::vector<paddle::experimental::Tensor>,
kSlotSmallVectorSize>
GradNodeScale::operator()(
paddle::small_vector<std::vector<paddle::experimental::Tensor>,
kSlotSmallVectorSize>& grads, // NOLINT
bool create_graph,
bool is_new_grad) {
// 1. Check Output Size
VLOG(6) << "grad size is: " << grads.size();
PADDLE_ENFORCE(
((grads.size() == 1) && (grads[0].size() == 1)),
paddle::platform::errors::Fatal(
"ScaleGradNode takes exactly 1 grad tensor."
"However received: %d",
"This indicates an issue with Eager Dygraph Backward logic",
grads.size()));
std::vector<std::vector<paddle::experimental::Tensor>> outs;
paddle::small_vector<std::vector<paddle::experimental::Tensor>,
kSlotSmallVectorSize>
outs;
// 2. Create needed out parttern
paddle::experimental::Tensor out;
// Apply Gradient Hooks
if (GradientHooksRegistered()) {
// TODO(jiabin): Shall we apply hook slot by slot here or accept
// vector<vector<phi::tensor>> to apply all hooks?
std::vector<std::vector<paddle::experimental::Tensor>> hooked_grads =
ApplyGradientHooks(grads);
paddle::small_vector<std::vector<paddle::experimental::Tensor>,
kSlotSmallVectorSize>
hooked_grads = ApplyGradientHooks(grads);
ScaleAPI(/* slot by slot set */ hooked_grads[0][0], scale_, 0.0 /* bias */,
true /* bias_after_scale */, &out);
} else {
Expand Down
Expand Up @@ -38,17 +38,20 @@ class GradNodeScale : public GradNodeBase {
~GradNodeScale() override = default;

// Functor: perform backward computations
virtual std::vector<std::vector<paddle::experimental::Tensor>> operator()(
std::vector<std::vector<paddle::experimental::Tensor>>& grads, // NOLINT
bool create_graph = false, bool is_new_grad = false) override;
virtual paddle::small_vector<std::vector<paddle::experimental::Tensor>,
kSlotSmallVectorSize>
operator()(paddle::small_vector<std::vector<paddle::experimental::Tensor>,
kSlotSmallVectorSize>& grads, // NOLINT
bool create_graph = false,
bool is_new_grad = false) override;

void ClearTensorWrappers() override { VLOG(6) << "Do nothing here now"; }

void SetTensorWrappers_X(
const std::vector<paddle::experimental::Tensor>& tensors);

void SetAttributes_scale(float scale);
std::string name() override { return ""; }
std::string name() override { return "scale node"; }
// Members: define fwd input tensors
// For Scale there is no fwd input tensor needed

Expand Down
Expand Up @@ -79,9 +79,6 @@ paddle::experimental::Tensor scale(const paddle::experimental::Tensor& x,
// Pass Attributes to GradNode
scale_node->SetAttributes_scale(scale);

// Set Next Edges
scale_node->AddEdges(p_autograd_in, /*slot id*/ 0);

// Set TensorWrappers
scale_node->SetTensorWrappers_X({x});

Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/eager/api/utils/global_utils.h
Expand Up @@ -19,8 +19,9 @@
#include <memory>
#include "paddle/fluid/imperative/tracer.h"
#include "paddle/phi/api/ext/op_meta_info.h"
#include "paddle/utils/small_vector.h"
namespace egr {

constexpr size_t kSlotSmallVectorSize = 15U;
class UniqueNameGenerator {
public:
explicit UniqueNameGenerator(std::string prefix = "") : prefix_(prefix) {}
Expand Down
31 changes: 15 additions & 16 deletions paddle/fluid/eager/auto_code_generator/eager_generator.cc
Expand Up @@ -1187,11 +1187,6 @@ static std::string GenerateGradNodeCreationContent(
grad_node_creation_str += paddle::string::Sprintf(
SET_GRAD_OUT_META_TEMPLATE, input_name, input_position);

const char* ADD_EDGES_TEMPLATE =
" if(%s) grad_node->AddEdges(%s, %d);\n";
grad_node_creation_str +=
paddle::string::Sprintf(ADD_EDGES_TEMPLATE, input_autograd_name,
input_autograd_name, input_position);
} else {
compute_require_grad_args += ", &" + input_autograd_name;
size_t input_position = fwd_inputs_name_pos_map.at(input_name);
Expand All @@ -1200,10 +1195,6 @@ static std::string GenerateGradNodeCreationContent(
" grad_node->SetGradOutMeta(%s, %d);\n";
grad_node_creation_str += paddle::string::Sprintf(
SET_GRAD_OUT_META_TEMPLATE, input_name, input_position);

const char* ADD_EDGES_TEMPLATE = " grad_node->AddEdges(&%s, %d);\n";
grad_node_creation_str += paddle::string::Sprintf(
ADD_EDGES_TEMPLATE, input_autograd_name, input_position);
}
}

Expand Down Expand Up @@ -1649,7 +1640,8 @@ static std::pair<std::string, std::string> GenerateForwardFunctionContents(
std::string amp_logic_str = "";
if (in_vars.size() != 0) {
const char* AMP_TENSORS_VECTOR_TEMPLATE =
" std::vector<std::vector<paddle::experimental::Tensor>> "
" paddle::small_vector<std::vector<paddle::experimental::Tensor>, "
"egr::kSlotSmallVectorSize> "
"amp_tensors_vector = { "
"%s };\n";
std::string amp_tensors_vector = paddle::string::Sprintf(
Expand Down Expand Up @@ -2428,9 +2420,11 @@ static std::string GenerateGradNodeCCContents(
}

const char* BWD_RETURN_TEMPLATE =
" std::vector<std::vector<paddle::experimental::Tensor>> hooked_grads = "
" paddle::small_vector<std::vector<paddle::experimental::Tensor>, "
"egr::kSlotSmallVectorSize> hooked_grads = "
"GradNode%s::ApplyGradientHooks(grads);\n"
" std::vector<std::vector<paddle::experimental::Tensor>> outputs(%d);\n"
" paddle::small_vector<std::vector<paddle::experimental::Tensor>, "
"egr::kSlotSmallVectorSize> outputs(%d);\n"
" %s\n"
" if(NeedComplexToRealConversion()) "
"HandleComplexGradToRealGrad(&outputs);\n"
Expand All @@ -2441,9 +2435,11 @@ static std::string GenerateGradNodeCCContents(

// [Generation] Get Full Grad Function
const char* GRAD_FUNCTION_TEMPLATE =
"std::vector<std::vector<paddle::experimental::Tensor>> "
"paddle::small_vector<std::vector<paddle::experimental::Tensor>, "
"egr::kSlotSmallVectorSize> "
"GradNode%s::operator()("
"std::vector<std::vector<paddle::experimental::Tensor>>& grads, bool "
"paddle::small_vector<std::vector<paddle::experimental::Tensor>, "
"egr::kSlotSmallVectorSize>& grads, bool "
"create_graph, bool is_new_grad) {\n"
"%s"
"%s"
Expand Down Expand Up @@ -2487,9 +2483,12 @@ static std::string GenerateGradNodeHeaderContents(
"Construct GradNode%s \"; }\n"
" ~GradNode%s() override { VLOG(6) << \" Destruct GradNode%s \"; }\n"
"\n"
" virtual std::vector<std::vector<paddle::experimental::Tensor>> "
" virtual "
"paddle::small_vector<std::vector<paddle::experimental::Tensor>, "
"egr::kSlotSmallVectorSize> "
"operator()("
"std::vector<std::vector<paddle::experimental::Tensor>>& grads, bool "
"paddle::small_vector<std::vector<paddle::experimental::Tensor>, "
"egr::kSlotSmallVectorSize>& grads, bool "
"create_graph = false, bool is_new_grad = false) "
"override;\n"
"\n"
Expand Down

0 comments on commit 51da5d6

Please sign in to comment.