Skip to content

Commit

Permalink
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
Browse files Browse the repository at this point in the history
… rrelu
  • Loading branch information
thunder95 committed May 14, 2022
2 parents 2132bf5 + 2eacef4 commit 116873b
Show file tree
Hide file tree
Showing 472 changed files with 16,572 additions and 3,634 deletions.
9 changes: 7 additions & 2 deletions CMakeLists.txt
Expand Up @@ -256,8 +256,8 @@ option(WITH_CUSTOM_DEVICE "Compile with custom device support" OFF)
option(WITH_ARM_BRPC "Supprot Brpc in Arm" OFF)

if(WITH_RECORD_BUILDTIME)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CMAKE_CURRENT_SOURCE_DIR}/tools/get_build_time.sh")
set_property(GLOBAL PROPERTY RULE_LAUNCH_LINK "${CMAKE_CURRENT_SOURCE_DIR}/tools/get_build_time.sh")
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "${CMAKE_CURRENT_SOURCE_DIR}/tools/get_build_time.sh ${CMAKE_CURRENT_BINARY_DIR}")
set_property(GLOBAL PROPERTY RULE_LAUNCH_LINK "${CMAKE_CURRENT_SOURCE_DIR}/tools/get_build_time.sh ${CMAKE_CURRENT_BINARY_DIR}")
else()
include(ccache) # set ccache for compilation ; if WITH_RECORD_BUILDTIME=ON can't use ccache
endif()
Expand Down Expand Up @@ -395,6 +395,11 @@ if(WITH_DISTRIBUTE)
MESSAGE(WARNING "Disable WITH_PSCORE when compiling with NPU. Force WITH_PSCORE=OFF.")
set(WITH_PSCORE OFF CACHE BOOL "Disable WITH_PSCORE when compiling with NPU" FORCE)
endif()
if(WITH_ROCM AND HIP_VERSION LESS_EQUAL 40020496)
# TODO(qili93): third-party rocksdb throw Illegal instruction with HIP version 40020496
MESSAGE(WARNING "Disable WITH_PSCORE when HIP_VERSION is less than or equal 40020496. Force WITH_PSCORE=OFF.")
set(WITH_PSCORE OFF CACHE BOOL "Disable WITH_PSCORE when HIP_VERSION is less than or equal 40020496" FORCE)
endif()
endif()

include(third_party) # download, build, install third_party, Contains about 20+ dependencies
Expand Down
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -20,7 +20,7 @@ PaddlePaddle is originated from industrial practices with dedication and commitm

## Installation

### Latest PaddlePaddle Release: [v2.2](https://github.com/PaddlePaddle/Paddle/tree/release/2.2)
### Latest PaddlePaddle Release: [v2.3](https://github.com/PaddlePaddle/Paddle/tree/release/2.3)

Our vision is to enable deep learning for everyone via PaddlePaddle.
Please refer to our [release announcement](https://github.com/PaddlePaddle/Paddle/releases) to track the latest features of PaddlePaddle.
Expand Down
2 changes: 1 addition & 1 deletion cmake/external/mkldnn.cmake
Expand Up @@ -19,7 +19,7 @@ SET(MKLDNN_PREFIX_DIR ${THIRD_PARTY_PATH}/mkldnn)
SET(MKLDNN_INSTALL_DIR ${THIRD_PARTY_PATH}/install/mkldnn)
SET(MKLDNN_INC_DIR "${MKLDNN_INSTALL_DIR}/include" CACHE PATH "mkldnn include directory." FORCE)
SET(MKLDNN_REPOSITORY ${GIT_URL}/oneapi-src/oneDNN.git)
SET(MKLDNN_TAG 9a35435c18722ff17a48fb60bceac42bfdf78754)
SET(MKLDNN_TAG 9b186765dded79066e0cd9c17eb70b680b76fb8e)


# Introduce variables:
Expand Down
4 changes: 2 additions & 2 deletions cmake/external/xpu.cmake
Expand Up @@ -9,15 +9,15 @@ SET(XPU_RT_LIB_NAME "libxpurt.so")

if(NOT DEFINED XPU_BASE_URL)
SET(XPU_BASE_URL_WITHOUT_DATE "https://baidu-kunlun-product.cdn.bcebos.com/KL-SDK/klsdk-dev")
SET(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20220425")
SET(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20220510")
else()
SET(XPU_BASE_URL "${XPU_BASE_URL}")
endif()

# ubuntu and centos: use output by XDNN API team
if(NOT DEFINED XPU_XDNN_BASE_URL)
SET(XPU_XDNN_BASE_URL_WITHOUT_DATE "https://klx-sdk-release-public.su.bcebos.com/xdnn/dev")
SET(XPU_XDNN_BASE_URL "${XPU_XDNN_BASE_URL_WITHOUT_DATE}/20220425")
SET(XPU_XDNN_BASE_URL "${XPU_XDNN_BASE_URL_WITHOUT_DATE}/20220510")
else()
SET(XPU_XDNN_BASE_URL "${XPU_XDNN_BASE_URL}")
endif()
Expand Down
29 changes: 29 additions & 0 deletions cmake/hip.cmake
Expand Up @@ -18,6 +18,33 @@ include_directories(${ROCM_PATH}/include)
message(STATUS "HIP version: ${HIP_VERSION}")
message(STATUS "HIP_CLANG_PATH: ${HIP_CLANG_PATH}")

macro(find_hip_version hip_header_file)
file(READ ${hip_header_file} HIP_VERSION_FILE_CONTENTS)

string(REGEX MATCH "define HIP_VERSION_MAJOR +([0-9]+)" HIP_MAJOR_VERSION
"${HIP_VERSION_FILE_CONTENTS}")
string(REGEX REPLACE "define HIP_VERSION_MAJOR +([0-9]+)" "\\1"
HIP_MAJOR_VERSION "${HIP_MAJOR_VERSION}")
string(REGEX MATCH "define HIP_VERSION_MINOR +([0-9]+)" HIP_MINOR_VERSION
"${HIP_VERSION_FILE_CONTENTS}")
string(REGEX REPLACE "define HIP_VERSION_MINOR +([0-9]+)" "\\1"
HIP_MINOR_VERSION "${HIP_MINOR_VERSION}")
string(REGEX MATCH "define HIP_VERSION_PATCH +([0-9]+)" HIP_PATCH_VERSION
"${HIP_VERSION_FILE_CONTENTS}")
string(REGEX REPLACE "define HIP_VERSION_PATCH +([0-9]+)" "\\1"
HIP_PATCH_VERSION "${HIP_PATCH_VERSION}")

if(NOT HIP_MAJOR_VERSION)
set(HIP_VERSION "???")
message(WARNING "Cannot find HIP version in ${HIP_PATH}/include/hip/hip_version.h")
else()
math(EXPR HIP_VERSION "${HIP_MAJOR_VERSION} * 10000000 + ${HIP_MINOR_VERSION} * 100000 + ${HIP_PATCH_VERSION}")
message(STATUS "Current HIP header is ${HIP_PATH}/include/hip/hip_version.h "
"Current HIP version is v${HIP_MAJOR_VERSION}.${HIP_MINOR_VERSION}.${HIP_PATCH_VERSION}. ")
endif()
endmacro()
find_hip_version(${HIP_PATH}/include/hip/hip_version.h)

macro(find_package_and_include PACKAGE_NAME)
find_package("${PACKAGE_NAME}" REQUIRED)
include_directories("${ROCM_PATH}/${PACKAGE_NAME}/include")
Expand Down Expand Up @@ -71,8 +98,10 @@ set(HIP_CLANG_FLAGS ${HIP_CXX_FLAGS})
# host linker to link.
list(APPEND HIP_HCC_FLAGS -fno-gpu-rdc)
list(APPEND HIP_HCC_FLAGS --amdgpu-target=gfx906)
list(APPEND HIP_HCC_FLAGS --amdgpu-target=gfx908)
list(APPEND HIP_CLANG_FLAGS -fno-gpu-rdc)
list(APPEND HIP_CLANG_FLAGS --amdgpu-target=gfx906)
list(APPEND HIP_CLANG_FLAGS --amdgpu-target=gfx908)


if(HIP_COMPILER STREQUAL clang)
Expand Down
2 changes: 1 addition & 1 deletion cmake/inference_lib.cmake
Expand Up @@ -416,7 +416,7 @@ function(version version_file)
endif()
if(WITH_ROCM)
file(APPEND ${version_file}
"HIP version: ${HIP_VERSION}\n"
"HIP version: v${HIP_MAJOR_VERSION}.${HIP_MINOR_VERSION}\n"
"MIOpen version: v${MIOPEN_MAJOR_VERSION}.${MIOPEN_MINOR_VERSION}\n")
endif()
if(WITH_ASCEND_CL)
Expand Down
2 changes: 1 addition & 1 deletion cmake/third_party.cmake
Expand Up @@ -357,7 +357,7 @@ if (WITH_PSCORE)
include(external/libmct) # download, build, install libmct
list(APPEND third_party_deps extern_libmct)

include(external/rocksdb) # download, build, install libmct
include(external/rocksdb) # download, build, install rocksdb
list(APPEND third_party_deps extern_rocksdb)
endif()

Expand Down
59 changes: 55 additions & 4 deletions paddle/fluid/distributed/collective/reducer.cc
Expand Up @@ -901,6 +901,9 @@ void EagerReducer::AllReduceSparse(EagerGroup *group,

dev_ctx->Wait();

Tensor src_value_tensor(std::make_shared<phi::DenseTensor>(src->value()));
std::vector<int64_t> dst_shape = src_value_tensor.shape();

if (std::all_of(cpu_rows_num_ptr, cpu_rows_num_ptr + size_,
[&](int64_t row) { return row == cpu_rows_num_ptr[0]; })) {
// During sparse communication, the number of each card is same.
Expand Down Expand Up @@ -940,8 +943,6 @@ void EagerReducer::AllReduceSparse(EagerGroup *group,
&dst_rows_vector);
dev_ctx->Wait();

Tensor src_value_tensor(std::make_shared<phi::DenseTensor>(src->value()));
std::vector<int64_t> dst_shape = src_value_tensor.shape();
dst_shape[dst_shape.size() - 2] = rows_num;
auto dst_dense_tensor = std::dynamic_pointer_cast<phi::DenseTensor>(
paddle::experimental::full(IntArray(dst_shape), 0,
Expand Down Expand Up @@ -971,8 +972,58 @@ void EagerReducer::AllReduceSparse(EagerGroup *group,
*(src->mutable_value()) =
*(std::dynamic_pointer_cast<phi::DenseTensor>(dst_value_tensor.impl()));
} else {
PADDLE_THROW(
platform::errors::Unimplemented("This case is not supported."));
std::vector<Tensor> rows_tensors;
std::vector<Tensor> values_tensors;

for (int i = 0; i < size_; ++i) {
std::vector<int64_t> value_tensor_shape = {
cpu_rows_num_ptr[i], dst_shape[dst_shape.size() - 1]};
Tensor rows_tensor = paddle::experimental::full(
IntArray({static_cast<int64_t>(cpu_rows_num_ptr[i])}), 0,
DataType::INT64, inner_place_);
Tensor values_tensor = paddle::experimental::full(
IntArray(value_tensor_shape), 0, src->value().dtype(), inner_place_);
std::vector<phi::DenseTensor> rows_dense_vector;
std::vector<phi::DenseTensor> values_dense_vector;

if (i == rank_) {
auto *rows_dense_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(rows_tensor.impl())
.get();
framework::TensorFromVector<int64_t>(src_rows, *dev_ctx,
rows_dense_tensor);
values_tensor.set_impl(
std::make_shared<phi::DenseTensor>(src->value()));
}
rows_dense_vector.push_back(
*std::dynamic_pointer_cast<phi::DenseTensor>(rows_tensor.impl()));
values_dense_vector.push_back(
*std::dynamic_pointer_cast<phi::DenseTensor>(values_tensor.impl()));

auto b_opts = BroadcastOptions();
b_opts.source_rank = i;
process_group_->Broadcast(rows_dense_vector, rows_dense_vector, b_opts);
process_group_
->Broadcast(values_dense_vector, values_dense_vector, b_opts)
->Wait();
rows_tensors.push_back(rows_tensor);
values_tensors.push_back(values_tensor);
}

Tensor dst_rows_tensor =
paddle::experimental::concat(rows_tensors, phi::Scalar(0));
framework::Vector<int64_t> dst_rows_vector(rows_num, 0);
auto *dst_rows_dense_tensor =
std::dynamic_pointer_cast<phi::DenseTensor>(dst_rows_tensor.impl())
.get();
framework::TensorToVector<int64_t>(*dst_rows_dense_tensor, *dev_ctx,
&dst_rows_vector);
src->set_rows(dst_rows_vector);

Tensor dst_values_tensor =
paddle::experimental::concat(values_tensors, phi::Scalar(0));
*(src->mutable_value()) = *(
std::dynamic_pointer_cast<phi::DenseTensor>(dst_values_tensor.impl()));
}
}

Expand Down
21 changes: 9 additions & 12 deletions paddle/fluid/distributed/ps/service/heter_client.h
Expand Up @@ -171,19 +171,16 @@ class HeterClient {
// switch client singleton
static std::shared_ptr<HeterClient> GetSwitchInstance(
const std::vector<std::string>& peer_endpoints, int32_t peer_role) {
std::unique_lock<std::mutex> lock(mtx_);
if (peer_endpoints.empty()) {
VLOG(4) << "init switch client failed, null peer_endpoints";
}
VLOG(4) << "peer role is: " << peer_role
<< ", addr is: " << peer_endpoints[0];
if (switch_s_instance_ == nullptr) {
std::unique_lock<std::mutex> lock(mtx_);
if (peer_endpoints.empty()) {
VLOG(4) << "init switch client failed, null peer_endpoints";
}
VLOG(4) << "peer role is: " << peer_role
<< ", addr is: " << peer_endpoints[0];
if (switch_s_instance_ == nullptr) {
switch_s_instance_.reset(new HeterClient());
switch_s_instance_->SetPeerSwitchList(peer_endpoints);
switch_s_instance_->InitClientChannels(false, peer_endpoints,
peer_role);
}
switch_s_instance_.reset(new HeterClient());
switch_s_instance_->SetPeerSwitchList(peer_endpoints);
switch_s_instance_->InitClientChannels(false, peer_endpoints, peer_role);
}
return switch_s_instance_;
}
Expand Down
7 changes: 5 additions & 2 deletions paddle/fluid/distributed/ps/service/heter_server.cc
Expand Up @@ -125,18 +125,21 @@ int SendAndRecvVariableHandler::SaveInSwitchWithShard(
brpc::Controller* cntl) {
VLOG(4) << "entering SaveInSwitchWithShard";
int32_t group_id = request->group_id();
if (group_id >= FLAGS_heter_world_size) {
LOG(ERROR) << "group id exceed maxmium";
}
auto& local_shard = _local_shards[group_id];
auto& request_io_buffer = cntl->request_attachment();
butil::IOBufBytesIterator io_buffer_itr(request_io_buffer);
for (int idx = 0; idx < request->send_var_names_size(); idx++) {
const auto& var_name = request->send_var_names(idx);
const auto& var_size = request->vars_len(idx);
WaitForVarsConsumed(group_id, var_name);
std::unique_lock<std::mutex> lk(scope_mutex_);
auto& value = local_shard[var_name];
value.resize(var_size);
io_buffer_itr.copy_and_forward(reinterpret_cast<void*>(value.data()),
var_size);
std::unique_lock<std::mutex> lk(scope_mutex_);
vars_ready_flag[group_id][var_name] = 1;
VLOG(4) << "saved var_name: " << var_name << "is saved ready!";
}
Expand All @@ -162,11 +165,11 @@ int SendAndRecvVariableHandler::QueryInSwitchWithShard(
VLOG(4) << "req var name: " << req_var_name;
response->add_send_var_names(req_var_name);
WaitForVarsProduced(group_id, req_var_name);
std::unique_lock<std::mutex> lk(scope_mutex_);
auto itr = local_shard.find(req_var_name);
auto& value = itr.value();
response_io_buffer.append(value.data(), value.size());
value.resize(0); // 清空内存
std::unique_lock<std::mutex> lk(scope_mutex_);
vars_ready_flag[group_id][req_var_name] = 0;
VLOG(4) << "query var_name: " << req_var_name << "is consumed ready!";
}
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/distributed/ps/service/ps_client.h
Expand Up @@ -109,7 +109,7 @@ class PSClient {
size_t table_id) = 0; // 保留

// firstly push dense param for parameter server
// this is neccessary because dense weight initialized in trainer on cold
// this is necessary because dense weight initialized in trainer on cold
// start
virtual std::future<int32_t> PushDenseParam(const Region *regions,
size_t region_num,
Expand Down
45 changes: 35 additions & 10 deletions paddle/fluid/distributed/store/tcp_store.cc
Expand Up @@ -19,21 +19,25 @@
#include "paddle/fluid/distributed/store/tcp_store.h"
#include "paddle/fluid/distributed/store/tcp_utils.h"
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/flags.h"

namespace paddle {
namespace distributed {

namespace detail {

constexpr int INFTIME = -1;
constexpr int INFTIME = 10000; // 10 seconds

std::unique_ptr<MasterDaemon> MasterDaemon::start(SocketType socket,
int nranks) {
return std::make_unique<MasterDaemon>(socket, nranks);
std::unique_ptr<MasterDaemon> MasterDaemon::start(SocketType socket, int nranks,
int stop_check_timeout) {
return std::make_unique<MasterDaemon>(socket, nranks, stop_check_timeout);
}

MasterDaemon::MasterDaemon(SocketType socket, int nranks)
: _listen_socket(socket), _nranks(nranks) {
MasterDaemon::MasterDaemon(SocketType socket, int nranks,
int stop_check_timeout)
: _listen_socket(socket),
_nranks(nranks),
_stop_check_timeout(stop_check_timeout) {
_background_thread = std::thread{&MasterDaemon::run, this};
}

Expand Down Expand Up @@ -86,6 +90,10 @@ void MasterDaemon::_do_get(SocketType socket) {

void MasterDaemon::_do_stop(SocketType socket) {
VLOG(3) << "MasterDaemon::_do_stop";
if (!_has_stop) {
_stop_time = std::chrono::system_clock::now();
}
_has_stop = true;
ReplyType value = ReplyType::STOP_WAIT;
tcputils::send_value<ReplyType>(socket, value);
if (--_nranks == 0) {
Expand Down Expand Up @@ -115,6 +123,20 @@ void MasterDaemon::run() {
#endif

while (!_stop) {
auto end_time = std::chrono::system_clock::now();
if (_has_stop) {
std::chrono::duration<double> diff = end_time - _stop_time;
int elapsed_seconds = static_cast<int>(diff.count());
PADDLE_ENFORCE_LT(
elapsed_seconds, _stop_check_timeout,
platform::errors::Fatal(
"%d seconds elapsed after the first worker "
"stopped, so we think there may be something wrong and will "
"stop the master worker. You can use "
"'export FLAGS_stop_check_timeout=3600'"
" to change the timeout value in seconds. The default one is 900",
elapsed_seconds));
}
for (size_t i = 0; i < fds.size(); i++) {
fds[i].revents = 0;
}
Expand Down Expand Up @@ -173,10 +195,12 @@ void MasterDaemon::run() {
}
}

std::unique_ptr<TCPServer> TCPServer::create(uint16_t port, int nranks) {
std::unique_ptr<TCPServer> TCPServer::create(uint16_t port, int nranks,
int stop_check_timeout) {
int socket = tcputils::tcp_listen("", std::to_string(port), AF_INET);
auto server = std::make_unique<TCPServer>();
server->_master_daemon = MasterDaemon::start(socket, nranks);
server->_master_daemon =
MasterDaemon::start(socket, nranks, stop_check_timeout);
return server;
}

Expand Down Expand Up @@ -219,10 +243,11 @@ std::vector<T> TCPClient::receive_vector() {
} // namespace detail

TCPStore::TCPStore(std::string host, uint16_t port, bool is_master,
size_t num_workers, std::chrono::seconds timeout)
size_t num_workers, std::chrono::seconds timeout,
int stop_check_timeout)
: Store(timeout), _is_master(is_master), _num_workers(num_workers) {
if (_is_master) {
_server = detail::TCPServer::create(port, num_workers);
_server = detail::TCPServer::create(port, num_workers, stop_check_timeout);
}

_client = detail::TCPClient::connect(host, port);
Expand Down

0 comments on commit 116873b

Please sign in to comment.