From 7c00e5308d37e96e559fb986a4982859415ff518 Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Tue, 24 Oct 2023 12:27:36 -0700 Subject: [PATCH] Support cudf as column-split input --- include/xgboost/data.h | 3 +- python-package/xgboost/data.py | 15 +- src/c_api/c_api.cu | 6 +- src/data/data.cc | 6 +- src/data/data.cu | 5 + src/data/simple_dmatrix.cc | 21 +-- src/data/simple_dmatrix.cu | 8 +- src/data/simple_dmatrix.h | 2 +- tests/ci_build/lint_python.py | 1 + tests/python-gpu/test_from_cudf.py | 213 +++++++++++++++++++---------- 10 files changed, 186 insertions(+), 94 deletions(-) diff --git a/include/xgboost/data.h b/include/xgboost/data.h index 04b489d8b061..bbae92232b9a 100644 --- a/include/xgboost/data.h +++ b/include/xgboost/data.h @@ -389,7 +389,8 @@ class SparsePage { /** * \brief Reindex the column index with an offset. */ - void Reindex(uint64_t feature_offset, int32_t n_threads); + void ReindexCPU(uint64_t feature_offset, int32_t n_threads); + void ReindexCUDA(uint64_t feature_offset); void SortRows(int32_t n_threads); diff --git a/python-package/xgboost/data.py b/python-package/xgboost/data.py index 49287d8177cc..4919a186463e 100644 --- a/python-package/xgboost/data.py +++ b/python-package/xgboost/data.py @@ -853,13 +853,18 @@ def _from_cudf_df( feature_names: Optional[FeatureNames], feature_types: Optional[FeatureTypes], enable_categorical: bool, + data_split_mode: DataSplitMode = DataSplitMode.ROW, ) -> DispatchedDataBackendReturnType: data, cat_codes, feature_names, feature_types = _transform_cudf_df( data, feature_names, feature_types, enable_categorical ) interfaces_str = _cudf_array_interfaces(data, cat_codes) handle = ctypes.c_void_p() - config = bytes(json.dumps({"missing": missing, "nthread": nthread}), "utf-8") + config = make_jcargs( + missing=float(missing), + nthread=int(nthread), + data_split_mode=int(data_split_mode), + ) _check_call( _LIB.XGDMatrixCreateFromCudaColumnar( interfaces_str, @@ -1096,7 +1101,13 @@ def dispatch_data_backend( ) if _is_cudf_df(data) or _is_cudf_ser(data): return _from_cudf_df( - data, missing, threads, feature_names, feature_types, enable_categorical + data, + missing, + threads, + feature_names, + feature_types, + enable_categorical, + data_split_mode, ) if _is_cupy_array(data): return _from_cupy_array(data, missing, threads, feature_names, feature_types) diff --git a/src/c_api/c_api.cu b/src/c_api/c_api.cu index 84a3715580f4..ef62ccc49248 100644 --- a/src/c_api/c_api.cu +++ b/src/c_api/c_api.cu @@ -96,9 +96,11 @@ XGB_DLL int XGDMatrixCreateFromCudaColumnar(char const *data, float missing = GetMissing(config); auto n_threads = OptionalArg(config, "nthread", 0); + auto data_split_mode = + static_cast(OptionalArg(config, "data_split_mode", 0)); data::CudfAdapter adapter(json_str); - *out = - new std::shared_ptr(DMatrix::Create(&adapter, missing, n_threads)); + *out = new std::shared_ptr( + DMatrix::Create(&adapter, missing, n_threads, "", data_split_mode)); API_END(); } diff --git a/src/data/data.cc b/src/data/data.cc index 7e70fff3fff3..a6c2b986e6c0 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -1045,13 +1045,17 @@ void SparsePage::SortIndices(int32_t n_threads) { }); } -void SparsePage::Reindex(uint64_t feature_offset, int32_t n_threads) { +void SparsePage::ReindexCPU(uint64_t feature_offset, int32_t n_threads) { auto& h_data = this->data.HostVector(); common::ParallelFor(h_data.size(), n_threads, [&](auto i) { h_data[i].index += feature_offset; }); } +#if !defined(XGBOOST_USE_CUDA) +void SparsePage::ReindexCUDA(uint64_t feature_offset) { common::AssertGPUSupport(); } +#endif // !defined(XGBOOST_USE_CUDA) + void SparsePage::SortRows(int32_t n_threads) { auto& h_offset = this->offset.HostVector(); auto& h_data = this->data.HostVector(); diff --git a/src/data/data.cu b/src/data/data.cu index 670af48c7b99..8d2ab2c77268 100644 --- a/src/data/data.cu +++ b/src/data/data.cu @@ -169,6 +169,11 @@ void MetaInfo::SetInfoFromCUDA(Context const& ctx, StringView key, Json array) { } } +void SparsePage::ReindexCUDA(uint64_t feature_offset) { + auto d_data = this->data.DeviceSpan(); + dh::LaunchN(d_data.size(), [=] __device__(size_t idx) { d_data[idx].index += feature_offset; }); +} + template DMatrix* DMatrix::Create(AdapterT* adapter, float missing, int nthread, const std::string& cache_prefix, DataSplitMode data_split_mode) { diff --git a/src/data/simple_dmatrix.cc b/src/data/simple_dmatrix.cc index 3814d74d2cec..5494c70e463a 100644 --- a/src/data/simple_dmatrix.cc +++ b/src/data/simple_dmatrix.cc @@ -74,14 +74,18 @@ DMatrix* SimpleDMatrix::SliceCol(int num_slices, int slice_id) { return out; } -void SimpleDMatrix::ReindexFeatures(Context const* ctx) { +void SimpleDMatrix::ReindexFeatures() { if (info_.IsColumnSplit() && collective::GetWorldSize() > 1) { auto const cols = collective::Allgather(info_.num_col_); auto const offset = std::accumulate(cols.cbegin(), cols.cbegin() + collective::GetRank(), 0ul); if (offset == 0) { return; } - sparse_page_->Reindex(offset, ctx->Threads()); + if (fmat_ctx_.IsCUDA()) { + sparse_page_->ReindexCUDA(offset); + } else { + sparse_page_->ReindexCPU(offset, fmat_ctx_.Threads()); + } } } @@ -216,8 +220,7 @@ BatchSet SimpleDMatrix::GetExtBatches(Context const*, BatchParam template SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread, DataSplitMode data_split_mode) { - Context ctx; - ctx.Init(Args{{"nthread", std::to_string(nthread)}}); + fmat_ctx_.Init(Args{{"nthread", std::to_string(nthread)}}); std::vector qids; uint64_t default_max = std::numeric_limits::max(); @@ -233,7 +236,7 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread, // Iterate over batches of input data while (adapter->Next()) { auto& batch = adapter->Value(); - auto batch_max_columns = sparse_page_->Push(batch, missing, ctx.Threads()); + auto batch_max_columns = sparse_page_->Push(batch, missing, fmat_ctx_.Threads()); inferred_num_columns = std::max(batch_max_columns, inferred_num_columns); total_batch_size += batch.Size(); // Append meta information if available @@ -282,7 +285,7 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread, // Synchronise worker columns info_.data_split_mode = data_split_mode; - ReindexFeatures(&ctx); + ReindexFeatures(); info_.SynchronizeNumberOfColumns(); if (adapter->NumRows() == kAdapterUnknownSize) { @@ -315,11 +318,9 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread, info_.num_nonzero_ = data_vec.size(); // Sort the index for row partitioners used by variuos tree methods. - if (!sparse_page_->IsIndicesSorted(ctx.Threads())) { - sparse_page_->SortIndices(ctx.Threads()); + if (!sparse_page_->IsIndicesSorted(fmat_ctx_.Threads())) { + sparse_page_->SortIndices(fmat_ctx_.Threads()); } - - this->fmat_ctx_ = ctx; } SimpleDMatrix::SimpleDMatrix(dmlc::Stream* in_stream) { diff --git a/src/data/simple_dmatrix.cu b/src/data/simple_dmatrix.cu index e41d5939463f..9a5bf3940c04 100644 --- a/src/data/simple_dmatrix.cu +++ b/src/data/simple_dmatrix.cu @@ -17,16 +17,13 @@ namespace xgboost::data { template SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, std::int32_t nthread, DataSplitMode data_split_mode) { - CHECK(data_split_mode != DataSplitMode::kCol) - << "Column-wise data split is currently not supported on the GPU."; auto device = (adapter->Device().IsCPU() || adapter->NumRows() == 0) ? DeviceOrd::CUDA(dh::CurrentDevice()) : adapter->Device(); CHECK(device.IsCUDA()); dh::safe_cuda(cudaSetDevice(device.ordinal)); - Context ctx; - ctx.Init(Args{{"nthread", std::to_string(nthread)}, {"device", device.Name()}}); + fmat_ctx_.Init(Args{{"nthread", std::to_string(nthread)}, {"device", device.Name()}}); CHECK(adapter->NumRows() != kAdapterUnknownSize); CHECK(adapter->NumColumns() != kAdapterUnknownSize); @@ -42,9 +39,8 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, std::int32_t nthr info_.num_row_ = adapter->NumRows(); // Synchronise worker columns info_.data_split_mode = data_split_mode; + ReindexFeatures(); info_.SynchronizeNumberOfColumns(); - - this->fmat_ctx_ = ctx; } template SimpleDMatrix::SimpleDMatrix(CudfAdapter* adapter, float missing, diff --git a/src/data/simple_dmatrix.h b/src/data/simple_dmatrix.h index 5b5bb2bfb2ba..283fd74835b1 100644 --- a/src/data/simple_dmatrix.h +++ b/src/data/simple_dmatrix.h @@ -69,7 +69,7 @@ class SimpleDMatrix : public DMatrix { * are globally indexed, so we reindex the features based on the offset needed to obtain the * global view. */ - void ReindexFeatures(Context const* ctx); + void ReindexFeatures(); private: // Context used only for DMatrix initialization. diff --git a/tests/ci_build/lint_python.py b/tests/ci_build/lint_python.py index 4cd4de8c1817..5caf3fbedd2f 100644 --- a/tests/ci_build/lint_python.py +++ b/tests/ci_build/lint_python.py @@ -26,6 +26,7 @@ class LintersPaths: "tests/python/test_tree_regularization.py", "tests/python/test_shap.py", "tests/python/test_with_pandas.py", + "tests/python-gpu/test_from_cudf.py", "tests/python-gpu/test_gpu_data_iterator.py", "tests/python-gpu/test_gpu_prediction.py", "tests/python-gpu/load_pickle.py", diff --git a/tests/python-gpu/test_from_cudf.py b/tests/python-gpu/test_from_cudf.py index 610c717a96d1..4a3f718131ef 100644 --- a/tests/python-gpu/test_from_cudf.py +++ b/tests/python-gpu/test_from_cudf.py @@ -6,13 +6,16 @@ import xgboost as xgb from xgboost import testing as tm +from xgboost.core import DataSplitMode sys.path.append("tests/python") from test_dmatrix import set_base_margin_info -def dmatrix_from_cudf(input_type, DMatrixT, missing=np.NAN): - '''Test constructing DMatrix from cudf''' +def dmatrix_from_cudf( + input_type, DMatrixT, missing=np.NAN, data_split_mode=DataSplitMode.ROW +): + """Test constructing DMatrix from cudf""" import cudf import pandas as pd @@ -25,9 +28,7 @@ def dmatrix_from_cudf(input_type, DMatrixT, missing=np.NAN): na[5, 0] = missing na[3, 1] = missing - pa = pd.DataFrame({'0': na[:, 0], - '1': na[:, 1], - '2': na[:, 2].astype(np.int32)}) + pa = pd.DataFrame({"0": na[:, 0], "1": na[:, 1], "2": na[:, 2].astype(np.int32)}) np_label = np.random.randn(kRows).astype(input_type) pa_label = pd.DataFrame(np_label) @@ -35,59 +36,79 @@ def dmatrix_from_cudf(input_type, DMatrixT, missing=np.NAN): cd = cudf.from_pandas(pa) cd_label = cudf.from_pandas(pa_label).iloc[:, 0] - dtrain = DMatrixT(cd, missing=missing, label=cd_label) - assert dtrain.num_col() == kCols + dtrain = DMatrixT( + cd, missing=missing, label=cd_label, data_split_mode=data_split_mode + ) + if data_split_mode == DataSplitMode.ROW: + assert dtrain.num_col() == kCols + else: + assert dtrain.num_col() == kCols * xgb.collective.get_world_size() assert dtrain.num_row() == kRows -def _test_from_cudf(DMatrixT): - '''Test constructing DMatrix from cudf''' +def _test_from_cudf(DMatrixT, data_split_mode=DataSplitMode.ROW): + """Test constructing DMatrix from cudf""" import cudf - dmatrix_from_cudf(np.float32, DMatrixT, np.NAN) - dmatrix_from_cudf(np.float64, DMatrixT, np.NAN) - dmatrix_from_cudf(np.int8, DMatrixT, 2) - dmatrix_from_cudf(np.int32, DMatrixT, -2) - dmatrix_from_cudf(np.int64, DMatrixT, -3) + world_size = xgb.collective.get_world_size() - cd = cudf.DataFrame({'x': [1, 2, 3], 'y': [0.1, 0.2, 0.3]}) - dtrain = DMatrixT(cd) + dmatrix_from_cudf(np.float32, DMatrixT, np.NAN, data_split_mode) + dmatrix_from_cudf(np.float64, DMatrixT, np.NAN, data_split_mode) - assert dtrain.feature_names == ['x', 'y'] - assert dtrain.feature_types == ['int', 'float'] + dmatrix_from_cudf(np.int8, DMatrixT, 2, data_split_mode) + dmatrix_from_cudf(np.int32, DMatrixT, -2, data_split_mode) + dmatrix_from_cudf(np.int64, DMatrixT, -3, data_split_mode) - series = cudf.DataFrame({'x': [1, 2, 3]}).iloc[:, 0] + cd = cudf.DataFrame({"x": [1, 2, 3], "y": [0.1, 0.2, 0.3]}) + dtrain = DMatrixT(cd, data_split_mode=data_split_mode) + + if data_split_mode == DataSplitMode.ROW: + assert dtrain.feature_names == ["x", "y"] + assert dtrain.feature_types == ["int", "float"] + else: + assert dtrain.feature_names == tm.column_split_feature_names( + ["x", "y"], world_size + ) + assert dtrain.feature_types == ["int", "float"] * world_size + + series = cudf.DataFrame({"x": [1, 2, 3]}).iloc[:, 0] assert isinstance(series, cudf.Series) - dtrain = DMatrixT(series) + dtrain = DMatrixT(series, data_split_mode=data_split_mode) - assert dtrain.feature_names == ['x'] - assert dtrain.feature_types == ['int'] + if data_split_mode == DataSplitMode.ROW: + assert dtrain.feature_names == ["x"] + assert dtrain.feature_types == ["int"] + else: + assert dtrain.feature_names == tm.column_split_feature_names(["x"], world_size) + assert dtrain.feature_types == ["int"] * world_size with pytest.raises(ValueError, match=r".*multi.*"): - dtrain = DMatrixT(cd, label=cd) + dtrain = DMatrixT(cd, label=cd, data_split_mode=data_split_mode) xgb.train({"tree_method": "gpu_hist", "objective": "multi:softprob"}, dtrain) # Test when number of elements is less than 8 - X = cudf.DataFrame({'x': cudf.Series([0, 1, 2, np.NAN, 4], - dtype=np.int32)}) - dtrain = DMatrixT(X) - assert dtrain.num_col() == 1 + X = cudf.DataFrame({"x": cudf.Series([0, 1, 2, np.NAN, 4], dtype=np.int32)}) + dtrain = DMatrixT(X, data_split_mode=data_split_mode) + if data_split_mode == DataSplitMode.ROW: + assert dtrain.num_col() == 1 + else: + assert dtrain.num_col() == 1 * world_size assert dtrain.num_row() == 5 # Boolean is not supported. - X_boolean = cudf.DataFrame({'x': cudf.Series([True, False])}) + X_boolean = cudf.DataFrame({"x": cudf.Series([True, False])}) with pytest.raises(Exception): - dtrain = DMatrixT(X_boolean) + dtrain = DMatrixT(X_boolean, data_split_mode=data_split_mode) - y_boolean = cudf.DataFrame({ - 'x': cudf.Series([True, False, True, True, True])}) + y_boolean = cudf.DataFrame({"x": cudf.Series([True, False, True, True, True])}) with pytest.raises(Exception): - dtrain = DMatrixT(X_boolean, label=y_boolean) + dtrain = DMatrixT(X_boolean, label=y_boolean, data_split_mode=data_split_mode) def _test_cudf_training(DMatrixT): import pandas as pd from cudf import DataFrame as df + np.random.seed(1) X = pd.DataFrame(np.random.randn(50, 10)) y = pd.DataFrame(np.random.randn(50)) @@ -97,21 +118,33 @@ def _test_cudf_training(DMatrixT): cudf_base_margin = df.from_pandas(pd.DataFrame(base_margin)) evals_result_cudf = {} - dtrain_cudf = DMatrixT(df.from_pandas(X), df.from_pandas(y), weight=cudf_weights, - base_margin=cudf_base_margin) - params = {'gpu_id': 0, 'tree_method': 'gpu_hist'} - xgb.train(params, dtrain_cudf, evals=[(dtrain_cudf, "train")], - evals_result=evals_result_cudf) + dtrain_cudf = DMatrixT( + df.from_pandas(X), + df.from_pandas(y), + weight=cudf_weights, + base_margin=cudf_base_margin, + ) + params = {"gpu_id": 0, "tree_method": "gpu_hist"} + xgb.train( + params, + dtrain_cudf, + evals=[(dtrain_cudf, "train")], + evals_result=evals_result_cudf, + ) evals_result_np = {} dtrain_np = xgb.DMatrix(X, y, weight=weights, base_margin=base_margin) - xgb.train(params, dtrain_np, evals=[(dtrain_np, "train")], - evals_result=evals_result_np) - assert np.array_equal(evals_result_cudf["train"]["rmse"], evals_result_np["train"]["rmse"]) + xgb.train( + params, dtrain_np, evals=[(dtrain_np, "train")], evals_result=evals_result_np + ) + assert np.array_equal( + evals_result_cudf["train"]["rmse"], evals_result_np["train"]["rmse"] + ) def _test_cudf_metainfo(DMatrixT): import pandas as pd from cudf import DataFrame as df + n = 100 X = np.random.random((n, 2)) dmat_cudf = DMatrixT(df.from_pandas(pd.DataFrame(X))) @@ -120,44 +153,68 @@ def _test_cudf_metainfo(DMatrixT): uints = np.array([4, 2, 8]).astype("uint32") cudf_floats = df.from_pandas(pd.DataFrame(floats)) cudf_uints = df.from_pandas(pd.DataFrame(uints)) - dmat.set_float_info('weight', floats) - dmat.set_float_info('label', floats) - dmat.set_float_info('base_margin', floats) - dmat.set_uint_info('group', uints) + dmat.set_float_info("weight", floats) + dmat.set_float_info("label", floats) + dmat.set_float_info("base_margin", floats) + dmat.set_uint_info("group", uints) dmat_cudf.set_info(weight=cudf_floats) dmat_cudf.set_info(label=cudf_floats) dmat_cudf.set_info(base_margin=cudf_floats) dmat_cudf.set_info(group=cudf_uints) # Test setting info with cudf DataFrame - assert np.array_equal(dmat.get_float_info('weight'), dmat_cudf.get_float_info('weight')) - assert np.array_equal(dmat.get_float_info('label'), dmat_cudf.get_float_info('label')) - assert np.array_equal(dmat.get_float_info('base_margin'), - dmat_cudf.get_float_info('base_margin')) - assert np.array_equal(dmat.get_uint_info('group_ptr'), dmat_cudf.get_uint_info('group_ptr')) + assert np.array_equal( + dmat.get_float_info("weight"), dmat_cudf.get_float_info("weight") + ) + assert np.array_equal( + dmat.get_float_info("label"), dmat_cudf.get_float_info("label") + ) + assert np.array_equal( + dmat.get_float_info("base_margin"), dmat_cudf.get_float_info("base_margin") + ) + assert np.array_equal( + dmat.get_uint_info("group_ptr"), dmat_cudf.get_uint_info("group_ptr") + ) # Test setting info with cudf Series dmat_cudf.set_info(weight=cudf_floats[cudf_floats.columns[0]]) dmat_cudf.set_info(label=cudf_floats[cudf_floats.columns[0]]) dmat_cudf.set_info(base_margin=cudf_floats[cudf_floats.columns[0]]) dmat_cudf.set_info(group=cudf_uints[cudf_uints.columns[0]]) - assert np.array_equal(dmat.get_float_info('weight'), dmat_cudf.get_float_info('weight')) - assert np.array_equal(dmat.get_float_info('label'), dmat_cudf.get_float_info('label')) - assert np.array_equal(dmat.get_float_info('base_margin'), - dmat_cudf.get_float_info('base_margin')) - assert np.array_equal(dmat.get_uint_info('group_ptr'), dmat_cudf.get_uint_info('group_ptr')) + assert np.array_equal( + dmat.get_float_info("weight"), dmat_cudf.get_float_info("weight") + ) + assert np.array_equal( + dmat.get_float_info("label"), dmat_cudf.get_float_info("label") + ) + assert np.array_equal( + dmat.get_float_info("base_margin"), dmat_cudf.get_float_info("base_margin") + ) + assert np.array_equal( + dmat.get_uint_info("group_ptr"), dmat_cudf.get_uint_info("group_ptr") + ) set_base_margin_info(df, DMatrixT, "gpu_hist") class TestFromColumnar: - '''Tests for constructing DMatrix from data structure conforming Apache -Arrow specification.''' + """Tests for constructing DMatrix from data structure conforming Apache + Arrow specification.""" @pytest.mark.skipif(**tm.no_cudf()) def test_simple_dmatrix_from_cudf(self): _test_from_cudf(xgb.DMatrix) + @pytest.mark.skipif(**tm.no_cudf()) + @pytest.mark.skipif(tm.is_windows(), reason="Rabit does not run on windows") + def test_simple_dmatrix_from_cudf_column_split(self): + tm.run_with_rabit( + world_size=3, + test_fn=_test_from_cudf, + DMatrixT=xgb.DMatrix, + data_split_mode=DataSplitMode.COL, + ) + @pytest.mark.skipif(**tm.no_cudf()) def test_device_dmatrix_from_cudf(self): _test_from_cudf(xgb.QuantileDMatrix) @@ -181,6 +238,7 @@ def test_cudf_metainfo_device_dmatrix(self): @pytest.mark.skipif(**tm.no_cudf()) def test_cudf_categorical(self) -> None: import cudf + n_features = 30 _X, _y = tm.make_categorical(100, n_features, 17, False) X = cudf.from_pandas(_X) @@ -251,6 +309,7 @@ def test_cudf_training_with_sklearn(): import pandas as pd from cudf import DataFrame as df from cudf import Series as ss + np.random.seed(1) X = pd.DataFrame(np.random.randn(50, 10)) y = pd.DataFrame((np.random.randn(50) > 0).astype(np.int8)) @@ -264,29 +323,36 @@ def test_cudf_training_with_sklearn(): y_cudf_series = ss(data=y.iloc[:, 0]) for y_obj in [y_cudf, y_cudf_series]: - clf = xgb.XGBClassifier(gpu_id=0, tree_method='gpu_hist') - clf.fit(X_cudf, y_obj, sample_weight=cudf_weights, base_margin=cudf_base_margin, - eval_set=[(X_cudf, y_obj)]) + clf = xgb.XGBClassifier(gpu_id=0, tree_method="gpu_hist") + clf.fit( + X_cudf, + y_obj, + sample_weight=cudf_weights, + base_margin=cudf_base_margin, + eval_set=[(X_cudf, y_obj)], + ) pred = clf.predict(X_cudf) assert np.array_equal(np.unique(pred), np.array([0, 1])) class IterForDMatrixTest(xgb.core.DataIter): - '''A data iterator for XGBoost DMatrix. + """A data iterator for XGBoost DMatrix. `reset` and `next` are required for any data iterator, other functions here are utilites for demonstration's purpose. - ''' - ROWS_PER_BATCH = 100 # data is splited by rows + """ + + ROWS_PER_BATCH = 100 # data is splited by rows BATCHES = 16 def __init__(self, categorical): - '''Generate some random data for demostration. + """Generate some random data for demostration. Actual data can be anything that is currently supported by XGBoost. - ''' + """ import cudf + self.rows = self.ROWS_PER_BATCH if categorical: @@ -300,34 +366,39 @@ def __init__(self, categorical): rng = np.random.RandomState(1994) self._data = [ cudf.DataFrame( - {'a': rng.randn(self.ROWS_PER_BATCH), - 'b': rng.randn(self.ROWS_PER_BATCH)})] * self.BATCHES + { + "a": rng.randn(self.ROWS_PER_BATCH), + "b": rng.randn(self.ROWS_PER_BATCH), + } + ) + ] * self.BATCHES self._labels = [rng.randn(self.rows)] * self.BATCHES - self.it = 0 # set iterator to 0 + self.it = 0 # set iterator to 0 super().__init__(cache_prefix=None) def as_array(self): import cudf + return cudf.concat(self._data) def as_array_labels(self): return np.concatenate(self._labels) def data(self): - '''Utility function for obtaining current batch of data.''' + """Utility function for obtaining current batch of data.""" return self._data[self.it] def labels(self): - '''Utility function for obtaining current batch of label.''' + """Utility function for obtaining current batch of label.""" return self._labels[self.it] def reset(self): - '''Reset the iterator''' + """Reset the iterator""" self.it = 0 def next(self, input_data): - '''Yield next batch of data''' + """Yield next batch of data""" if self.it == len(self._data): # Return 0 when there's no more batch. return 0