diff --git a/src/collective/aggregator.h b/src/collective/aggregator.h index 8a5b31c36546..9f9028c8e25b 100644 --- a/src/collective/aggregator.h +++ b/src/collective/aggregator.h @@ -14,6 +14,7 @@ #include "communicator-inl.h" #include "xgboost/collective/result.h" // for Result #include "xgboost/data.h" // for MetaINfo +#include "../processing/processor.h" // for Processor namespace xgboost::collective { @@ -69,7 +70,7 @@ void ApplyWithLabels(Context const*, MetaInfo const& info, void* buffer, std::si * @param result The HostDeviceVector storing the results. * @param function The function used to calculate the results. */ -template +template void ApplyWithLabels(Context const*, MetaInfo const& info, HostDeviceVector* result, Function&& function) { if (info.IsVerticalFederated()) { @@ -96,8 +97,46 @@ void ApplyWithLabels(Context const*, MetaInfo const& info, HostDeviceVector* } collective::Broadcast(&size, sizeof(std::size_t), 0); - result->Resize(size); - collective::Broadcast(result->HostPointer(), size * sizeof(T), 0); + if (info.IsSecure() && is_gpair) { + // Under secure mode, gpairs will be processed to vector and encrypt + // information only available on rank 0 + std::size_t buffer_size{}; + std::int8_t *buffer; + if (collective::GetRank() == 0) { + std::vector vector_gh; + for (std::size_t i = 0; i < size; i++) { + auto gpair = result->HostVector()[i]; + // cast from GradientPair to float pointer + auto gpair_ptr = reinterpret_cast(&gpair); + // save to vector + vector_gh.push_back(gpair_ptr[0]); + vector_gh.push_back(gpair_ptr[1]); + } + // provide the vectors to the processor interface + size_t size; + auto buf = processor_instance->ProcessGHPairs(&size, vector_gh); + buffer_size = size; + buffer = reinterpret_cast(buf); + } + + // broadcast the buffer size for other ranks to prepare + collective::Broadcast(&buffer_size, sizeof(std::size_t), 0); + // prepare buffer on passive parties for satisfying broadcast mpi call + if (collective::GetRank() != 0) { + buffer = reinterpret_cast(malloc(buffer_size)); + } + + // broadcast the data buffer holding processed gpairs + collective::Broadcast(buffer, buffer_size, 0); + + // call HandleGHPairs + size_t size; + processor_instance->HandleGHPairs(&size, buffer, buffer_size); + } else { + // clear text mode, broadcast the data directly + result->Resize(size); + collective::Broadcast(result->HostPointer(), size * sizeof(T), 0); + } } else { std::forward(function)(); } diff --git a/src/collective/communicator.cc b/src/collective/communicator.cc index 7fabe50b465d..0386ccb97fd2 100644 --- a/src/collective/communicator.cc +++ b/src/collective/communicator.cc @@ -1,6 +1,7 @@ /*! * Copyright 2022 XGBoost contributors */ +#include #include "communicator.h" #include "comm.h" @@ -9,14 +10,39 @@ #include "rabit_communicator.h" #if defined(XGBOOST_USE_FEDERATED) -#include "../../plugin/federated/federated_communicator.h" + #include "../../plugin/federated/federated_communicator.h" #endif +#include "../processing/processor.h" +processing::Processor *processor_instance; + namespace xgboost::collective { thread_local std::unique_ptr Communicator::communicator_{new NoOpCommunicator()}; thread_local CommunicatorType Communicator::type_{}; thread_local std::string Communicator::nccl_path_{}; +std::map json_to_map(xgboost::Json const& config, std::string key) { + auto json_map = xgboost::OptionalArg(config, key, xgboost::JsonObject::Map{}); + std::map params{}; + for (auto entry : json_map) { + std::string text; + xgboost::Value* value = &(entry.second.GetValue()); + if (value->Type() == xgboost::Value::ValueKind::kString) { + text = reinterpret_cast(value)->GetString(); + } else if (value->Type() == xgboost::Value::ValueKind::kInteger) { + auto num = reinterpret_cast(value)->GetInteger(); + text = std::to_string(num); + } else if (value->Type() == xgboost::Value::ValueKind::kNumber) { + auto num = reinterpret_cast(value)->GetNumber(); + text = std::to_string(num); + } else { + text = "Unsupported type "; + } + params[entry.first] = text; + } + return params; +} + void Communicator::Init(Json const& config) { auto nccl = OptionalArg(config, "dmlc_nccl_path", std::string{DefaultNcclName()}); nccl_path_ = nccl; @@ -38,19 +64,35 @@ void Communicator::Init(Json const& config) { } case CommunicatorType::kFederated: { #if defined(XGBOOST_USE_FEDERATED) - communicator_.reset(FederatedCommunicator::Create(config)); + communicator_.reset(FederatedCommunicator::Create(config)); + // Get processor configs + std::string plugin_name{}; + std::string loader_params_key{}; + std::string loader_params_map{}; + std::string proc_params_key{}; + std::string proc_params_map{}; + plugin_name = OptionalArg(config, "plugin_name", plugin_name); + // Initialize processor if plugin_name is provided + if (!plugin_name.empty()) { + std::map loader_params = json_to_map(config, "loader_params"); + std::map proc_params = json_to_map(config, "proc_params"); + processing::ProcessorLoader loader(loader_params); + processor_instance = loader.load(plugin_name); + processor_instance->Initialize(collective::GetRank() == 0, proc_params); + } #else - LOG(FATAL) << "XGBoost is not compiled with Federated Learning support."; + LOG(FATAL) << "XGBoost is not compiled with Federated Learning support."; #endif - break; - } - case CommunicatorType::kInMemory: - case CommunicatorType::kInMemoryNccl: { - communicator_.reset(InMemoryCommunicator::Create(config)); - break; - } - case CommunicatorType::kUnknown: - LOG(FATAL) << "Unknown communicator type."; + break; + } + + case CommunicatorType::kInMemory: + case CommunicatorType::kInMemoryNccl: { + communicator_.reset(InMemoryCommunicator::Create(config)); + break; + } + case CommunicatorType::kUnknown: + LOG(FATAL) << "Unknown communicator type."; } } @@ -58,6 +100,10 @@ void Communicator::Init(Json const& config) { void Communicator::Finalize() { communicator_->Shutdown(); communicator_.reset(new NoOpCommunicator()); + if (processor_instance != nullptr) { + processor_instance->Shutdown(); + processor_instance = nullptr; + } } #endif } // namespace xgboost::collective diff --git a/src/learner.cc b/src/learner.cc index eed9dd5cdcd7..b8212e2b2868 100644 --- a/src/learner.cc +++ b/src/learner.cc @@ -846,7 +846,7 @@ class LearnerConfiguration : public Learner { void InitEstimation(MetaInfo const& info, linalg::Tensor* base_score) { base_score->Reshape(1); - collective::ApplyWithLabels(this->Ctx(), info, base_score->Data(), + collective::ApplyWithLabels(this->Ctx(), info, base_score->Data(), [&] { UsePtr(obj_)->InitEstimation(info, base_score); }); } }; @@ -1472,8 +1472,9 @@ class LearnerImpl : public LearnerIO { void GetGradient(HostDeviceVector const& preds, MetaInfo const& info, std::int32_t iter, linalg::Matrix* out_gpair) { out_gpair->Reshape(info.num_row_, this->learner_model_param_.OutputLength()); - collective::ApplyWithLabels(&ctx_, info, out_gpair->Data(), - [&] { obj_->GetGradient(preds, info, iter, out_gpair); }); + // calculate gradient and communicate + collective::ApplyWithLabels(&ctx_, info, out_gpair->Data(), + [&] { obj_->GetGradient(preds, info, iter, out_gpair); }); } /*! \brief random number transformation seed. */ diff --git a/src/processing/plugins/mock_processor.cc b/src/processing/plugins/mock_processor.cc new file mode 100644 index 000000000000..918ab9e8e0e6 --- /dev/null +++ b/src/processing/plugins/mock_processor.cc @@ -0,0 +1,118 @@ +/** + * Copyright 2014-2024 by XGBoost Contributors + */ +#include +#include +#include +#include "./mock_processor.h" + +const char kSignature[] = "NVDADAM1"; // DAM (Direct Accessible Marshalling) V1 +const int64_t kPrefixLen = 24; + +bool ValidDam(void *buffer, std::size_t size) { + return size >= kPrefixLen && memcmp(buffer, kSignature, strlen(kSignature)) == 0; +} + +void* MockProcessor::ProcessGHPairs(std::size_t *size, const std::vector& pairs) { + *size = kPrefixLen + pairs.size()*10*8; // Assume encrypted size is 10x + + int64_t buf_size = *size; + // This memory needs to be freed + char *buf = static_cast(calloc(*size, 1)); + memcpy(buf, kSignature, strlen(kSignature)); + memcpy(buf + 8, &buf_size, 8); + memcpy(buf + 16, &kDataTypeGHPairs, 8); + + // Simulate encryption by duplicating value 10 times + int index = kPrefixLen; + for (auto value : pairs) { + for (std::size_t i = 0; i < 10; i++) { + memcpy(buf+index, &value, 8); + index += 8; + } + } + + // Save pairs for future operations + this->gh_pairs_ = new std::vector(pairs); + + return buf; +} + + +void* MockProcessor::HandleGHPairs(std::size_t *size, void *buffer, std::size_t buf_size) { + *size = buf_size; + if (!ValidDam(buffer, *size)) { + return buffer; + } + + // For mock, this call is used to set gh_pairs for passive sites + if (!active_) { + int8_t *ptr = static_cast(buffer); + ptr += kPrefixLen; + double *pairs = reinterpret_cast(ptr); + std::size_t num = (buf_size - kPrefixLen) / 8; + gh_pairs_ = new std::vector(); + for (std::size_t i = 0; i < num; i += 10) { + gh_pairs_->push_back(pairs[i]); + } + } + + auto result = malloc(buf_size); + memcpy(result, buffer, buf_size); + + return result; +} + +void *MockProcessor::ProcessAggregation(std::size_t *size, std::map> nodes) { + int total_bin_size = cuts_.back(); + int histo_size = total_bin_size*2; + *size = kPrefixLen + 8*histo_size*nodes.size(); + int64_t buf_size = *size; + int8_t *buf = static_cast(calloc(buf_size, 1)); + memcpy(buf, kSignature, strlen(kSignature)); + memcpy(buf + 8, &buf_size, 8); + memcpy(buf + 16, &kDataTypeHisto, 8); + + double *histo = reinterpret_cast(buf + kPrefixLen); + for ( const auto &node : nodes ) { + auto rows = node.second; + for (const auto &row_id : rows) { + auto num = cuts_.size() - 1; + for (std::size_t f = 0; f < num; f++) { + int slot = slots_[f + num*row_id]; + if ((slot < 0) || (slot >= total_bin_size)) { + continue; + } + + auto g = (*gh_pairs_)[row_id*2]; + auto h = (*gh_pairs_)[row_id*2+1]; + histo[slot*2] += g; + histo[slot*2+1] += h; + } + } + histo += histo_size; + } + + return buf; +} + +std::vector MockProcessor::HandleAggregation(void *buffer, std::size_t buf_size) { + std::vector result = std::vector(); + + int8_t* ptr = static_cast(buffer); + auto rest_size = buf_size; + + while (rest_size > kPrefixLen) { + if (!ValidDam(ptr, rest_size)) { + break; + } + int64_t *size_ptr = reinterpret_cast(ptr + 8); + double *array_start = reinterpret_cast(ptr + kPrefixLen); + auto array_size = (*size_ptr - kPrefixLen)/8; + result.insert(result.end(), array_start, array_start + array_size); + rest_size -= *size_ptr; + ptr = ptr + *size_ptr; + } + + return result; +} diff --git a/src/processing/plugins/mock_processor.h b/src/processing/plugins/mock_processor.h new file mode 100644 index 000000000000..48ca1e812a44 --- /dev/null +++ b/src/processing/plugins/mock_processor.h @@ -0,0 +1,65 @@ +/** + * Copyright 2014-2024 by XGBoost Contributors + */ +#pragma once +#include +#include +#include +#include "../processor.h" + +// Data type definition +const int64_t kDataTypeGHPairs = 1; +const int64_t kDataTypeHisto = 2; +const int64_t kDataTypeAggregatedHisto = 3; + +class MockProcessor: public processing::Processor { + private: + bool active_ = false; + const std::map *params_{nullptr}; + std::vector *gh_pairs_{nullptr}; + std::vector cuts_; + std::vector slots_; + + public: + ~MockProcessor() { + if (gh_pairs_) { + gh_pairs_->clear(); + delete gh_pairs_; + } + } + + void Initialize(bool active, std::map params) override { + this->active_ = active; + this->params_ = ¶ms; + } + + void Shutdown() override { + if (gh_pairs_) { + gh_pairs_->clear(); + delete gh_pairs_; + } + gh_pairs_ = nullptr; + cuts_.clear(); + slots_.clear(); + } + + void FreeBuffer(void *buffer) override { + free(buffer); + } + + void* ProcessGHPairs(size_t *size, const std::vector& pairs) override; + + void* HandleGHPairs(size_t *size, void *buffer, size_t buf_size) override; + + void InitAggregationContext(const std::vector &cuts, + const std::vector &slots) override { + this->cuts_ = cuts; + if (this->slots_.empty()) { + this->slots_ = slots; + } + } + + void *ProcessAggregation(size_t *size, std::map> nodes) override; + + std::vector HandleAggregation(void *buffer, size_t buf_size) override; +}; diff --git a/src/processing/processor.h b/src/processing/processor.h new file mode 100644 index 000000000000..f3add42cd40f --- /dev/null +++ b/src/processing/processor.h @@ -0,0 +1,118 @@ +/** + * Copyright 2014-2024 by XGBoost Contributors + */ +#pragma once + +#include +#include +#include +#include +#include + +namespace processing { + +const char kLibraryPath[] = "LIBRARY_PATH"; +const char kMockProcessor[] = "mock"; +const char kLoadFunc[] = "LoadProcessor"; + +/*! \brief An processor interface to handle tasks that require external library through plugins */ +class Processor { + public: + /*! + * \brief Virtual destructor + * + */ + virtual ~Processor() = default; + + /*! + * \brief Initialize the processor + * + * \param active If true, this is the active node + * \param params Optional parameters + */ + virtual void Initialize(bool active, std::map params) = 0; + + /*! + * \brief Shutdown the processor and free all the resources + * + */ + virtual void Shutdown() = 0; + + /*! + * \brief Free buffer + * + * \param buffer Any buffer returned by the calls from the plugin + */ + virtual void FreeBuffer(void* buffer) = 0; + + /*! + * \brief Preparing g & h pairs to be sent to other clients by active client + * + * \param size The size of the buffer + * \param pairs g&h pairs in a vector (g1, h1, g2, h2 ...) for every sample + * + * \return The encoded buffer to be sent + */ + virtual void* ProcessGHPairs(size_t *size, const std::vector& pairs) = 0; + + /*! + * \brief Handle buffers with encoded pairs received from broadcast + * + * \param size Output buffer size + * \param The encoded buffer + * \param The encoded buffer size + * + * \return The encoded buffer + */ + virtual void* HandleGHPairs(size_t *size, void *buffer, size_t buf_size) = 0; + + /*! + * \brief Initialize aggregation context by providing global GHistIndexMatrix + * + * \param cuts The cut point for each feature + * \param slots The slot assignment in a flattened matrix for each feature/row. + * The size is num_feature*num_row + */ + virtual void InitAggregationContext(const std::vector &cuts, + const std::vector &slots) = 0; + + /*! + * \brief Prepare row set for aggregation + * + * \param size The output buffer size + * \param nodes Map of node and the rows belong to this node + * + * \return The encoded buffer to be sent via AllGatherV + */ + virtual void *ProcessAggregation(size_t *size, std::map> nodes) = 0; + + /*! + * \brief Handle all gather result + * + * \param buffer Buffer from all gather, only buffer from active site is needed + * \param buf_size The size of the buffer + * + * \return A flattened vector of histograms for each site, each node in the form of + * site1_node1, site1_node2 site1_node3, site2_node1, site2_node2, site2_node3 + */ + virtual std::vector HandleAggregation(void *buffer, size_t buf_size) = 0; +}; + +class ProcessorLoader { + private: + std::map params; + void *handle_ = NULL; + + public: + ProcessorLoader(): params{} {} + + explicit ProcessorLoader(const std::map& params): params(params) {} + + Processor* load(const std::string& plugin_name); + + void unload(); +}; + +} // namespace processing + +extern processing::Processor *processor_instance; diff --git a/src/processing/processor_loader.cc b/src/processing/processor_loader.cc new file mode 100644 index 000000000000..dfb267a35d6e --- /dev/null +++ b/src/processing/processor_loader.cc @@ -0,0 +1,90 @@ +/** + * Copyright 2014-2024 by XGBoost Contributors + */ + +#include + +#if defined(_WIN32) || defined(_WIN64) +#include +#else +#include +#endif + +#include "./processor.h" +#include "plugins/mock_processor.h" + +namespace processing { + using LoadFunc = Processor *(const char *); + + Processor* ProcessorLoader::load(const std::string& plugin_name) { + // Dummy processor for unit testing without loading a shared library + if (plugin_name == kMockProcessor) { + return new MockProcessor(); + } + + auto lib_name = "libproc_" + plugin_name; + + auto extension = +#if defined(_WIN32) || defined(_WIN64) + ".dll"; +#elif defined(__APPLE__) || defined(__MACH__) + ".dylib"; +#else + ".so"; +#endif + auto lib_file_name = lib_name + extension; + + std::string lib_path; + + if (params.find(kLibraryPath) == params.end()) { + lib_path = lib_file_name; + } else { + auto p = params[kLibraryPath]; + if (p.back() != '/' && p.back() != '\\') { + p += '/'; + } + lib_path = p + lib_file_name; + } + +#if defined(_WIN32) || defined(_WIN64) + handle_ = reinterpret_cast(LoadLibrary(lib_path.c_str())); + if (!handle_) { + std::cerr << "Failed to load the dynamic library" << std::endl; + return NULL; + } + + void* func_ptr = reinterpret_cast(GetProcAddress((HMODULE)handle_, kLoadFunc)); + if (!func_ptr) { + std::cerr << "Failed to find loader function." << std::endl; + return NULL; + } +#else + handle_ = dlopen(lib_path.c_str(), RTLD_LAZY); + if (!handle_) { + std::cerr << "Failed to load the dynamic library: " << dlerror() << std::endl; + return NULL; + } + void* func_ptr = dlsym(handle_, kLoadFunc); + if (!func_ptr) { + std::cerr << "Failed to find loader function: " << dlerror() << std::endl; + return NULL; + } +#endif + + auto func = reinterpret_cast(func_ptr); + + return (*func)(plugin_name.c_str()); + } + + void ProcessorLoader::unload() { +#if defined(_WIN32) + if (handle_) { + FreeLibrary(handle_); + } +#else + if (handle_) { + dlclose(handle_); + } +#endif + } +} // namespace processing diff --git a/src/tree/hist/histogram.h b/src/tree/hist/histogram.h index d4cea58d0f72..d34c43cd4565 100644 --- a/src/tree/hist/histogram.h +++ b/src/tree/hist/histogram.h @@ -10,6 +10,7 @@ #include // for function #include // for move #include // for vector +#include // for map #include "../../collective/communicator-inl.h" // for Allreduce #include "../../collective/communicator.h" // for Operation @@ -27,6 +28,7 @@ #include "xgboost/logging.h" // for CHECK_GE #include "xgboost/span.h" // for Span #include "xgboost/tree_model.h" // for RegTree +#include "../../processing/processor.h" // for Processor namespace xgboost::tree { /** @@ -51,6 +53,7 @@ class HistogramBuilder { bool is_distributed_{false}; bool is_col_split_{false}; bool is_secure_{false}; + xgboost::common::Span hist_data; public: /** @@ -76,20 +79,51 @@ class HistogramBuilder { std::vector const &nodes_to_build, common::RowSetCollection const &row_set_collection, common::Span gpair_h, bool force_read_by_column) { - // Parallel processing by nodes and data in each node - common::ParallelFor2d(space, this->n_threads_, [&](size_t nid_in_set, common::Range1d r) { - const auto tid = static_cast(omp_get_thread_num()); - bst_node_t const nidx = nodes_to_build[nid_in_set]; - auto elem = row_set_collection[nidx]; - auto start_of_row_set = std::min(r.begin(), elem.Size()); - auto end_of_row_set = std::min(r.end(), elem.Size()); - auto rid_set = common::RowSetCollection::Elem(elem.begin + start_of_row_set, - elem.begin + end_of_row_set, nidx); - auto hist = buffer_.GetInitializedHist(tid, nid_in_set); - if (rid_set.Size() != 0) { - common::BuildHist(gpair_h, rid_set, gidx, hist, force_read_by_column); + if (is_distributed_ && is_col_split_ && is_secure_) { + // Call the interface to transmit gidx information to the secure worker + // for encrypted histogram compuation + auto slots = std::vector(); + auto num_rows = row_set_collection[0].Size(); + auto cuts = gidx.Cuts().Ptrs(); + for (std::size_t row = 0; row < num_rows; row++) { + for (std::size_t f = 0; f < cuts.size()-1; f++) { + auto slot = gidx.GetGindex(row, f); + slots.push_back(slot); + } } - }); + processor_instance->InitAggregationContext(cuts, slots); + // Further use the row set collection info to + // get the encrypted histogram from the secure worker + auto node_map = std::map>(); + for (auto node : nodes_to_build) { + auto rows = std::vector(); + auto elem = row_set_collection[node]; + for (auto it = elem.begin; it != elem.end; ++it) { + auto row_id = *it; + rows.push_back(row_id); + } + node_map.insert({node, rows}); + } + std::size_t buf_size; + auto buf = processor_instance->ProcessAggregation(&buf_size, node_map); + hist_data = xgboost::common::Span(static_cast(buf), buf_size); + } else { + // Parallel processing by nodes and data in each node + common::ParallelFor2d(space, this->n_threads_, + [&](std::size_t nid_in_set, common::Range1d r) { + const auto tid = static_cast(omp_get_thread_num()); + bst_node_t const nidx = nodes_to_build[nid_in_set]; + auto elem = row_set_collection[nidx]; + auto start_of_row_set = std::min(r.begin(), elem.Size()); + auto end_of_row_set = std::min(r.end(), elem.Size()); + auto rid_set = common::RowSetCollection::Elem(elem.begin + start_of_row_set, + elem.begin + end_of_row_set, nidx); + auto hist = buffer_.GetInitializedHist(tid, nid_in_set); + if (rid_set.Size() != 0) { + common::BuildHist(gpair_h, rid_set, gidx, hist, force_read_by_column); + } + }); + } } /** @@ -157,7 +191,7 @@ class HistogramBuilder { // Add the local histogram cache to the parallel buffer before processing the first page. auto n_nodes = nodes_to_build.size(); std::vector target_hists(n_nodes); - for (size_t i = 0; i < n_nodes; ++i) { + for (std::size_t i = 0; i < n_nodes; ++i) { auto const nidx = nodes_to_build[i]; target_hists[i] = hist_[nidx]; } @@ -180,7 +214,7 @@ class HistogramBuilder { common::BlockedSpace2d space( nodes_to_build.size(), [&](std::size_t) { return n_total_bins; }, 1024); - common::ParallelFor2d(space, this->n_threads_, [&](size_t node, common::Range1d r) { + common::ParallelFor2d(space, this->n_threads_, [&](std::size_t node, common::Range1d r) { // Merging histograms from each thread. this->buffer_.ReduceHist(node, r.begin(), r.end()); }); @@ -194,17 +228,41 @@ class HistogramBuilder { } if (is_distributed_ && is_col_split_ && is_secure_) { - // Under secure vertical mode, we perform allgather for all nodes + // Under secure vertical mode, we perform allgather to get the global histogram. + // note that only Label Owner needs the global histogram CHECK(!nodes_to_build.empty()); - // in theory the operation is AllGather, under current histogram setting of - // same length with 0s for empty slots, - // AllReduce is the most efficient way of achieving the global histogram + + // Front item of nodes_to_build auto first_nidx = nodes_to_build.front(); + // *2 because we have a pair of g and h for each histogram item std::size_t n = n_total_bins * nodes_to_build.size() * 2; - collective::Allreduce( - reinterpret_cast(this->hist_[first_nidx].data()), n); - } + // Perform AllGather + auto hist_vec = std::vector(hist_data.data(), + hist_data.data() + hist_data.size()); + auto hist_entries = collective::AllgatherV(hist_vec); + // Call interface here to post-process the messages + std::vector hist_aggr = processor_instance->HandleAggregation( + hist_entries.data(), hist_entries.size()); + + // Update histogram for label owner + if (collective::GetRank() == 0) { + // iterator of the beginning of the vector + auto it = reinterpret_cast(this->hist_[first_nidx].data()); + // iterate through the hist vector of the label owner + for (std::size_t i = 0; i < n; i++) { + // get the sum of the entries from all ranks + double hist_sum = 0.0; + for (std::size_t rank_idx = 0; rank_idx < hist_aggr.size()/n; rank_idx++) { + int flat_idx = rank_idx * n + i; + hist_sum += hist_aggr[flat_idx]; + } + // update rank 0's record with the global histogram + *it = hist_sum; + it++; + } + } + } common::BlockedSpace2d const &subspace = nodes_to_trick.size() == nodes_to_build.size() ? space @@ -240,7 +298,7 @@ common::BlockedSpace2d ConstructHistSpace(Partitioner const &partitioners, // has significant variance. std::vector partition_size(nodes_to_build.size(), 0); for (auto const &partition : partitioners) { - size_t k = 0; + std::size_t k = 0; for (auto nidx : nodes_to_build) { auto n_rows_in_node = partition.Partitions()[nidx].Size(); partition_size[k] = std::max(partition_size[k], n_rows_in_node); @@ -248,7 +306,8 @@ common::BlockedSpace2d ConstructHistSpace(Partitioner const &partitioners, } } common::BlockedSpace2d space{ - nodes_to_build.size(), [&](size_t nidx_in_set) { return partition_size[nidx_in_set]; }, 256}; + nodes_to_build.size(), + [&](std::size_t nidx_in_set) { return partition_size[nidx_in_set]; }, 256}; return space; } @@ -325,6 +384,7 @@ class MultiHistogramBuilder { CHECK_EQ(gpair.Shape(1), p_tree->NumTargets()); for (bst_target_t t = 0; t < p_tree->NumTargets(); ++t) { auto t_gpair = gpair.Slice(linalg::All(), t); + CHECK_EQ(t_gpair.Shape(0), p_fmat->Info().num_row_); this->target_builders_[t].BuildHist(page_idx, space, page, partitioners[page_idx].Partitions(), nodes_to_build, diff --git a/tests/cpp/processing/test_processor.cc b/tests/cpp/processing/test_processor.cc new file mode 100644 index 000000000000..7ba1292439cf --- /dev/null +++ b/tests/cpp/processing/test_processor.cc @@ -0,0 +1,95 @@ +/*! + * Copyright 2024 XGBoost contributors + */ +#include + +#include "../../../src/processing/processor.h" + +const double kError = 1E-10; + +class ProcessorTest : public testing::Test { + public: + void SetUp() override { + auto loader = processing::ProcessorLoader(); + processor_ = loader.load(processing::kMockProcessor); + processor_->Initialize(true, {}); + loader.unload(); + } + + void TearDown() override { + processor_->Shutdown(); + delete processor_; + processor_ = nullptr; + } + + protected: + processing::Processor *processor_ = nullptr; + + // Test data, 4 Rows, 2 Features + std::vector gh_pairs_ = {1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1}; // 4 Rows, 8 GH Pairs + std::vector cuts_ = {0, 4, 10}; // 2 features, one has 4 bins, another 6 + std::vector slots_ = { + 0, 4, + 1, 9, + 3, 7, + 0, 4 + }; + + std::vector node0_ = {0, 2}; + std::vector node1_ = {1, 3}; + + std::map> nodes_ = {{0, node0_}, + {1, node1_}}; + + std::vector histo1_ = {1.0, 2.0, 3.0, 4.0}; + std::vector histo2_ = {5.0, 6.0, 7.0, 8.0}; +}; + +TEST_F(ProcessorTest, TestLoading) { + auto base_class = dynamic_cast(processor_); + ASSERT_NE(base_class, nullptr); +} + +TEST_F(ProcessorTest, TestGHEncoding) { + size_t buf_size; + auto buffer = processor_->ProcessGHPairs(&buf_size, gh_pairs_); + size_t expected_size = 24; // DAM header size + expected_size += gh_pairs_.size() * 10 * 8; // Dummy plugin duplicate each number 10x to simulate encryption + ASSERT_EQ(buf_size, expected_size); + + size_t new_size; + auto new_buffer = processor_->HandleGHPairs(&new_size, buffer, buf_size); + // Dummy plugin doesn't change buffer + ASSERT_EQ(new_size, buf_size); + ASSERT_EQ(0, memcmp(buffer, new_buffer, buf_size)); + + // Clean up + processor_->FreeBuffer(buffer); + processor_->FreeBuffer(new_buffer); +} + +TEST_F(ProcessorTest, TestAggregation) { + size_t buf_size; + auto gh_buffer = processor_->ProcessGHPairs(&buf_size, gh_pairs_); // Pass the GH pairs to the plugin + + processor_->InitAggregationContext(cuts_, slots_); + auto buffer = processor_->ProcessAggregation(&buf_size, nodes_); + auto histos = processor_->HandleAggregation(buffer, buf_size); + double expected_result[] = { + 1.1, 2.1, 0, 0, 0, 0, 5.1, 6.1, 1.1, 2.1, + 0, 0, 0, 0, 5.1, 6.1, 0, 0, 0, 0, + 7.1, 8.1, 3.1, 4.1, 0, 0, 0, 0, 7.1, 8.1, + 0, 0, 0, 0, 0, 0, 0, 0, 3.1, 4.1 + }; + + auto expected_size = sizeof(expected_result)/sizeof(expected_result[0]); + + ASSERT_EQ(expected_size, histos.size()) << "Histograms have different sizes"; + + for (size_t i = 0; i < histos.size(); ++i) { + EXPECT_NEAR(expected_result[i], histos[i], kError) << "Histogram differs at index " << i; + } + + processor_->FreeBuffer(buffer); + processor_->FreeBuffer(gh_buffer); +} diff --git a/tests/cpp/tree/hist/test_histogram.cc b/tests/cpp/tree/hist/test_histogram.cc index 76428d1d83b4..481ae18c180c 100644 --- a/tests/cpp/tree/hist/test_histogram.cc +++ b/tests/cpp/tree/hist/test_histogram.cc @@ -228,7 +228,7 @@ void TestBuildHistogram(bool is_distributed, bool force_read_by_column, bool is_ Context ctx; auto p_fmat = RandomDataGenerator(kNRows, kNCols, 0.8).Seed(3).GenerateDMatrix(); - if (is_col_split && !is_secure) { + if (is_col_split) { p_fmat = std::shared_ptr{ p_fmat->SliceCol(collective::GetWorldSize(), collective::GetRank())}; } @@ -254,7 +254,6 @@ void TestBuildHistogram(bool is_distributed, bool force_read_by_column, bool is_ row_indices.resize(kNRows); std::iota(row_indices.begin(), row_indices.end(), 0); row_set_collection.Init(); - CPUExpandEntry node{RegTree::kRoot, tree.GetDepth(0)}; std::vector nodes_to_build{node.nid}; std::vector dummy_sub; @@ -288,8 +287,8 @@ void TestBuildHistogram(bool is_distributed, bool force_read_by_column, bool is_ GradientPairPrecise sol = histogram_expected[i]; double grad = sol.GetGrad(); double hess = sol.GetHess(); - if (is_distributed && (!is_col_split || (is_secure && is_col_split))) { - // the solution also needs to be allreduce + if (is_distributed && !is_col_split) { + // row split, all party holds the same data collective::Allreduce(&grad, 1); collective::Allreduce(&hess, 1); } @@ -317,11 +316,6 @@ TEST(CPUHistogram, BuildHistDistColSplit) { RunWithInMemoryCommunicator(kWorkers, TestBuildHistogram, true, false, true, false); } -TEST(CPUHistogram, BuildHistDistColSplitSecure) { - auto constexpr kWorkers = 4; - RunWithInMemoryCommunicator(kWorkers, TestBuildHistogram, true, true, true, true); - RunWithInMemoryCommunicator(kWorkers, TestBuildHistogram, true, false, true, true); -} namespace { template