Skip to content

Commit

Permalink
Merge branch 'develop' into heaviside_3
Browse files Browse the repository at this point in the history
  • Loading branch information
BrilliantYuKaimin committed Apr 21, 2022
2 parents 65a0fde + 5c73822 commit 4d6cc60
Show file tree
Hide file tree
Showing 398 changed files with 13,689 additions and 3,017 deletions.
6 changes: 5 additions & 1 deletion cmake/cuda.cmake
Expand Up @@ -132,7 +132,11 @@ function(select_nvcc_arch_flags out_variable)
elseif(${CUDA_ARCH_NAME} STREQUAL "Turing")
set(cuda_arch_bin "75")
elseif(${CUDA_ARCH_NAME} STREQUAL "Ampere")
set(cuda_arch_bin "80")
if (${CMAKE_CUDA_COMPILER_VERSION} LESS 11.1) # CUDA 11.0
set(cuda_arch_bin "80")
elseif (${CMAKE_CUDA_COMPILER_VERSION} LESS 12.0) # CUDA 11.1+
set(cuda_arch_bin "80 86")
endif()
elseif(${CUDA_ARCH_NAME} STREQUAL "All")
set(cuda_arch_bin ${paddle_known_gpu_archs})
elseif(${CUDA_ARCH_NAME} STREQUAL "Auto")
Expand Down
3 changes: 1 addition & 2 deletions cmake/external/cinn.cmake
Expand Up @@ -26,7 +26,7 @@ add_definitions(-w)
######################################
include(ExternalProject)
set(CINN_PREFIX_DIR ${THIRD_PARTY_PATH}/CINN)
set(CINN_GIT_TAG 1fd85187b6c18da4dd51f22619d093ef08d61b01)
set(CINN_GIT_TAG eedb801ca39bfc6b9621bc76c24a0bf98cb8404b)
set(CINN_OPTIONAL_ARGS -DPY_VERSION=${PY_VERSION}
-DWITH_CUDA=${WITH_GPU}
-DWITH_CUDNN=${WITH_GPU}
Expand Down Expand Up @@ -85,4 +85,3 @@ add_library(cinn SHARED IMPORTED GLOBAL)
set_target_properties(cinn PROPERTIES IMPORTED_LOCATION "${CINN_LIB_LOCATION}/${CINN_LIB_NAME}")
include_directories(${CINN_INCLUDE_DIR})
add_dependencies(cinn external_cinn)

2 changes: 1 addition & 1 deletion cmake/external/lite.cmake
Expand Up @@ -50,7 +50,7 @@ if (NOT LITE_SOURCE_DIR OR NOT LITE_BINARY_DIR)
set(LITE_INSTALL_DIR ${THIRD_PARTY_PATH}/install/lite)

if(NOT LITE_GIT_TAG)
set(LITE_GIT_TAG 4ab64daecc11fbf74fffdc6a4733f388472e7d5d)
set(LITE_GIT_TAG 81ef66554099800c143a0feff6e0a491b3b0d12e)
endif()

if(NOT CUDA_ARCH_NAME)
Expand Down
11 changes: 10 additions & 1 deletion cmake/inference_lib.cmake
Expand Up @@ -248,15 +248,24 @@ copy(inference_lib_dist
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/phi/common/*.h
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/phi/common/)
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/phi/core/macros.h
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/phi/core/)
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/phi/core/visit_type.h
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/phi/core/)
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/utils/any.h
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/utils/)
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/utils/optional.h
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/utils/)
copy(inference_lib_dist
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/utils/none.h
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/utils/)
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/utils/flat_hash_map.h
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/utils/)
copy(inference_lib_dist
SRCS ${PADDLE_SOURCE_DIR}/paddle/extension.h
DSTS ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/)
Expand Down
9 changes: 6 additions & 3 deletions cmake/neuware.cmake
Expand Up @@ -17,13 +17,16 @@ INCLUDE_DIRECTORIES(${NEUWARE_INCLUDE_DIR})
set(CNNL_LIB ${NEUWARE_LIB_DIR}/libcnnl.so)
set(CNRT_LIB ${NEUWARE_LIB_DIR}/libcnrt.so)
set(CNDRV_LIB ${NEUWARE_LIB_DIR}/libcndrv.so)
set(CNPAPI_LIB ${NEUWARE_LIB_DIR}/libcnpapi.so)

generate_dummy_static_lib(LIB_NAME "neuware_lib" GENERATOR "neuware.cmake")
set(NEUWARE_LIB_DEPS ${CNNL_LIB} ${CNRT_LIB} ${CNDRV_LIB} ${CNPAPI_LIB})

if(WITH_CNCL)
MESSAGE(STATUS "Compile with CNCL!")
ADD_DEFINITIONS(-DPADDLE_WITH_CNCL)
set(CNCL_LIB ${NEUWARE_LIB_DIR}/libcncl.so)
TARGET_LINK_LIBRARIES(neuware_lib ${CNCL_LIB} ${CNNL_LIB} ${CNRT_LIB} ${CNDRV_LIB})
else()
TARGET_LINK_LIBRARIES(neuware_lib ${CNNL_LIB} ${CNRT_LIB} ${CNDRV_LIB})
list(APPEND NEUWARE_LIB_DEPS ${CNCL_LIB})
endif()

TARGET_LINK_LIBRARIES(neuware_lib ${NEUWARE_LIB_DEPS})
3 changes: 2 additions & 1 deletion cmake/phi_header.cmake
Expand Up @@ -36,7 +36,8 @@ phi_header_path_compat(${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experiment
phi_header_path_compat(${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/phi/api/ext)
phi_header_path_compat(${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/phi/api/include)
phi_header_path_compat(${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/phi/common)
phi_header_path_compat(${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/phi/core)

# In order to be compatible with the original behavior, the header file name needs to be changed
file(RENAME ${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/extension.h
${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/ext_all.h)
${PADDLE_INFERENCE_INSTALL_DIR}/paddle/include/experimental/ext_all.h)
5 changes: 3 additions & 2 deletions paddle/fluid/distributed/collective/ProcessGroup.cc
Expand Up @@ -35,8 +35,9 @@ bool ProcessGroup::Task::Wait(std::chrono::milliseconds timeout) {

void ProcessGroup::Task::Synchronize() {}

ProcessGroup::ProcessGroup(int rank, int size, int gid)
: rank_(rank), size_(size), gid_(gid) {
ProcessGroup::ProcessGroup(int rank, int size, const platform::Place& place,
int gid)
: rank_(rank), size_(size), place_(place), gid_(gid) {
if (gid != IGNORE_ID) {
auto map = ProcessGroupMapFromGid::getInstance();
map->insert(gid_, this);
Expand Down
4 changes: 3 additions & 1 deletion paddle/fluid/distributed/collective/ProcessGroup.h
Expand Up @@ -69,7 +69,8 @@ class ProcessGroup {
bool is_completed_ = false;
};

explicit ProcessGroup(int rank, int size, int gid);
explicit ProcessGroup(int rank, int size, const platform::Place& place,
int gid);
virtual ~ProcessGroup() {}

int GetRank() const { return rank_; }
Expand Down Expand Up @@ -145,6 +146,7 @@ class ProcessGroup {
protected:
const int rank_;
const int size_;
const platform::Place place_;
const int gid_;
};

Expand Down
5 changes: 3 additions & 2 deletions paddle/fluid/distributed/collective/ProcessGroupGloo.cc
Expand Up @@ -165,8 +165,9 @@ ProcessGroupGloo::GlooTask::GlooTask(

ProcessGroupGloo::ProcessGroupGloo(
const std::shared_ptr<distributed::Store>& store, int rank, int world_size,
int gid, const std::shared_ptr<GlooOptions> options)
: ProcessGroup(rank, world_size, gid),
const platform::Place& place, int gid,
const std::shared_ptr<GlooOptions> options)
: ProcessGroup(rank, world_size, place, gid),
_tag(0),
_store(new GlooStore(store)) {
_context = std::make_shared<gloo::rendezvous::Context>(rank, world_size);
Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/distributed/collective/ProcessGroupGloo.h
Expand Up @@ -102,7 +102,8 @@ class ProcessGroupGloo : public ProcessGroup {

explicit ProcessGroupGloo(
const std::shared_ptr<paddle::distributed::Store>& store, int rank,
int world_size, int gid, std::shared_ptr<GlooOptions> options);
int world_size, const platform::Place& place, int gid,
std::shared_ptr<GlooOptions> options);

~ProcessGroupGloo() = default;

Expand Down
8 changes: 6 additions & 2 deletions paddle/fluid/distributed/collective/ProcessGroupHCCL.cc
Expand Up @@ -17,6 +17,7 @@
#include "paddle/fluid/distributed/collective/HCCLTools.h"
#include "paddle/fluid/memory/malloc.h"
#include "paddle/fluid/platform/device/npu/hccl_helper.h"
#include "paddle/fluid/platform/device/npu/npu_info.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/api/include/api.h"
Expand Down Expand Up @@ -97,8 +98,11 @@ bool ProcessGroupHCCL::HCCLTask::Wait(std::chrono::milliseconds timeout) {
void ProcessGroupHCCL::HCCLTask::Synchronize() { Wait(kWaitTimeout); }

ProcessGroupHCCL::ProcessGroupHCCL(const std::shared_ptr<Store>& store,
int rank, int size, int gid)
: ProcessGroup(rank, size, gid), store_(store) {}
int rank, int size,
const platform::Place& place, int gid)
: ProcessGroup(rank, size, place, gid), store_(store) {
platform::SetNPUDeviceId(place_.device);
}

void ProcessGroupHCCL::BroadcastUniqueHCCLID(
std::vector<HcclRootInfo>& hccl_ids) { // NOLINT
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/distributed/collective/ProcessGroupHCCL.h
Expand Up @@ -71,7 +71,7 @@ class ProcessGroupHCCL : public ProcessGroup {
};

ProcessGroupHCCL(const std::shared_ptr<Store>& store, int rank, int size,
int gid);
const platform::Place& place, int gid);

const std::string GetBackendName() const override {
return std::string(HCCL_BACKEND_NAME);
Expand Down
20 changes: 9 additions & 11 deletions paddle/fluid/distributed/collective/ProcessGroupHeter.cc
Expand Up @@ -44,13 +44,11 @@ bool ProcessGroupHeter::HeterTask::Wait(std::chrono::milliseconds timeout) {
return true;
}

ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr<Store>& store,
int rank, int size, int gid,
int local_rank, int local_size,
int gloo_rank, int gloo_size,
bool with_switch,
std::string switch_endpoint)
: ProcessGroup(rank, size, gid),
ProcessGroupHeter::ProcessGroupHeter(
const std::shared_ptr<Store>& store, int rank, int size,
const platform::Place& place, int gid, int local_rank, int local_size,
int gloo_rank, int gloo_size, bool with_switch, std::string switch_endpoint)
: ProcessGroup(rank, size, place, gid),
store_(store),
local_rank_(local_rank),
local_size_(local_size),
Expand All @@ -60,19 +58,19 @@ ProcessGroupHeter::ProcessGroupHeter(const std::shared_ptr<Store>& store,
switch_endpoint_(switch_endpoint) {
#if defined(PADDLE_WITH_NCCL)
inner_pg_ = std::make_shared<ProcessGroupNCCL>(store, local_rank, local_size,
IGNORE_ID);
place_, IGNORE_ID);
#elif defined(PADDLE_WITH_ASCEND_CL)
inner_pg_ = std::make_shared<ProcessGroupHCCL>(store, local_rank, local_size,
IGNORE_ID);
place_, IGNORE_ID);
#else
PADDLE_THROW(platform::errors::Fatal(
"ProcessGroupHeter only supports NCCL and HCCL now.");
#endif
if (local_rank_ == 0 && !with_switch_) {
auto opts = ProcessGroupGloo::GlooOptions::create();
opts->device = ProcessGroupGloo::createDefaultDevice();
inter_pg_ = std::make_shared<ProcessGroupGloo>(store, gloo_rank_,
gloo_size_, IGNORE_ID, opts);
inter_pg_ = std::make_shared<ProcessGroupGloo>(
store, gloo_rank_, gloo_size_, place_, IGNORE_ID, opts);
}
}

Expand Down
6 changes: 3 additions & 3 deletions paddle/fluid/distributed/collective/ProcessGroupHeter.h
Expand Up @@ -81,9 +81,9 @@ class ProcessGroupHeter : public ProcessGroup {
};

ProcessGroupHeter(const std::shared_ptr<Store>& store, int rank, int size,
int gid, int local_rank, int local_size, int gloo_rank,
int gloo_size, bool with_switch,
std::string switch_endpoints);
const platform::Place& place, int gid, int local_rank,
int local_size, int gloo_rank, int gloo_size,
bool with_switch, std::string switch_endpoints);

const std::string GetBackendName() const override {
return std::string(HETER_BACKEND_NAME);
Expand Down
27 changes: 9 additions & 18 deletions paddle/fluid/distributed/collective/ProcessGroupNCCL.cc
Expand Up @@ -14,6 +14,7 @@

#include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h"
#include "paddle/fluid/distributed/collective/Common.h"
#include "paddle/fluid/platform/device/gpu/gpu_info.h"
#include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/phi/api/include/api.h"
Expand Down Expand Up @@ -103,8 +104,11 @@ bool ProcessGroupNCCL::NCCLTask::Wait(std::chrono::milliseconds timeout) {
void ProcessGroupNCCL::NCCLTask::Synchronize() { Wait(kWaitTimeout); }

ProcessGroupNCCL::ProcessGroupNCCL(const std::shared_ptr<Store>& store,
int rank, int size, int gid)
: ProcessGroup(rank, size, gid), store_(store) {}
int rank, int size,
const platform::Place& place, int gid)
: ProcessGroup(rank, size, place, gid), store_(store) {
platform::SetDeviceId(place_.device);
}

void ProcessGroupNCCL::BroadcastUniqueNCCLID(
std::vector<ncclUniqueId>& nccl_ids) { // NOLINT
Expand Down Expand Up @@ -349,29 +353,16 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Broadcast(

std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Barrier(
const BarrierOptions& opts) {
std::vector<phi::GPUPlace> places;

if (!opts.place_ids.empty()) {
for (auto place_id : opts.place_ids) {
places.emplace_back(place_id);
}
} else if (!used_place_ids_.empty()) {
for (auto place_id : used_place_ids_) {
places.emplace_back(place_id);
}
} else {
auto numGPUs = GetSize();
int place_id = static_cast<int>(rank_ % numGPUs);
places.emplace_back(place_id);
}
// Only support single card single process
std::vector<phi::GPUPlace> places = {place_};

std::vector<phi::DenseTensor> barrierTensors;
barrierTensors.reserve(places.size());

platform::CUDADeviceGuard gpuGuard;
for (auto& place : places) {
gpuGuard.SetDeviceIndex(place.GetDeviceId());
auto dt = full({1}, 0, phi::DataType::FLOAT32, phi::GPUPlace());
auto dt = full({1}, 0, phi::DataType::FLOAT32, place);
barrierTensors.push_back(
*std::dynamic_pointer_cast<phi::DenseTensor>(dt.impl()));
}
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/distributed/collective/ProcessGroupNCCL.h
Expand Up @@ -77,7 +77,7 @@ class ProcessGroupNCCL : public ProcessGroup {
};

ProcessGroupNCCL(const std::shared_ptr<Store>& store, int rank, int size,
int gid);
const platform::Place& place, int gid);

const std::string GetBackendName() const override {
return std::string(NCCL_BACKEND_NAME);
Expand Down
8 changes: 7 additions & 1 deletion paddle/fluid/distributed/fleet_executor/carrier.cc
Expand Up @@ -186,7 +186,13 @@ int64_t Carrier::GetRank(int64_t interceptor_id) const {
}

bool Carrier::Send(const InterceptorMessage& msg) {
int64_t src_id = (msg.src_id() == -1) ? msg.dst_id() : msg.src_id();
int64_t src_id = msg.src_id();
// TODO(liyurui): compatible solution, will be removed completely in the
// future
if (interceptor_id_to_rank_.find(src_id) == interceptor_id_to_rank_.end() &&
src_id == SOURCE_ID) {
src_id = msg.dst_id();
}
int64_t dst_id = msg.dst_id();
int64_t src_rank = GetRank(src_id);
int64_t dst_rank = GetRank(dst_id);
Expand Down
Expand Up @@ -161,7 +161,7 @@ void ComputeInterceptor::ReplyCompletedToUpStream() {
VLOG(3) << "ComputeInterceptor " << interceptor_id_
<< " Reply data_is_useless msg to " << up_id
<< " for step: " << step_;
if (up_id == -1) return;
if (is_source_ && up_id == -1) return;

InterceptorMessage reply_msg;
reply_msg.set_message_type(DATA_IS_USELESS);
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/distributed/fleet_executor/dist_model.cc
Expand Up @@ -168,7 +168,7 @@ bool DistModel::Init() {
if (!PrepareFeedAndFetch()) {
return false;
}
if (!CommInit()) {
if (config_.nranks > 1 && !CommInit()) {
return false;
}
if (!PrepareFleetExe()) {
Expand Down
3 changes: 3 additions & 0 deletions paddle/fluid/distributed/fleet_executor/interceptor.h
Expand Up @@ -40,6 +40,9 @@ class TaskNode;
class Carrier;
class TaskLoop;

constexpr int64_t SOURCE_ID = -1;
constexpr int64_t SINK_ID = -2;

class Interceptor {
public:
using MsgHandle = std::function<void(const InterceptorMessage&)>;
Expand Down
Expand Up @@ -27,8 +27,8 @@ enum MessageType {
}

message InterceptorMessage {
optional int64 src_id = 1 [ default = 0 ];
optional int64 dst_id = 2 [ default = 0 ];
optional sint64 src_id = 1 [ default = 0 ];
optional sint64 dst_id = 2 [ default = 0 ];
optional MessageType message_type = 3 [ default = RESET ];
optional bool ctrl_message = 4 [ default = false ];
optional int64 scope_idx = 5 [ default = 0 ];
Expand Down
Expand Up @@ -30,7 +30,7 @@ SinkInterceptor::SinkInterceptor(int64_t interceptor_id, TaskNode* node)
void SinkInterceptor::StopCarrierIfComplete() {
bool flag = true;
for (const auto& up : upstream_step_) {
flag = flag & (up.second == max_run_times_);
flag = flag && (up.second == max_run_times_);
}
if (flag) {
VLOG(3) << "Sink Interceptor is stopping carrier";
Expand Down
3 changes: 3 additions & 0 deletions paddle/fluid/distributed/fleet_executor/task_node.cc
Expand Up @@ -74,6 +74,9 @@ void TaskNode::Init(bool use_feed_fetch_ops) {
}
}

TaskNode::TaskNode(int64_t rank, int64_t task_id, int64_t max_run_times)
: rank_(rank), task_id_(task_id), max_run_times_(max_run_times) {}

TaskNode::TaskNode(int32_t role,
const std::vector<framework::OpDesc*>& op_descs,
int64_t rank, int64_t task_id, int64_t max_run_times,
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/distributed/fleet_executor/task_node.h
Expand Up @@ -32,6 +32,7 @@ namespace distributed {
class TaskNode final {
public:
using OperatorBase = paddle::framework::OperatorBase;
TaskNode(int64_t rank, int64_t task_id, int64_t max_run_times);
TaskNode(int32_t role, int64_t rank, int64_t task_id, int64_t max_run_times,
int64_t max_slot_nums);
TaskNode(int32_t role, const std::vector<framework::OpDesc*>& op_descs,
Expand Down

0 comments on commit 4d6cc60

Please sign in to comment.