forked from dmlc/xgboost
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_data.py
159 lines (132 loc) · 5.2 KB
/
test_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import sys
from typing import List
import numpy as np
import pandas as pd
import pytest
import testing as tm
if tm.no_spark()["condition"]:
pytest.skip(msg=tm.no_spark()["reason"], allow_module_level=True)
if sys.platform.startswith("win") or sys.platform.startswith("darwin"):
pytest.skip("Skipping PySpark tests on Windows", allow_module_level=True)
from xgboost.spark.data import (
_read_csr_matrix_from_unwrapped_spark_vec,
alias,
create_dmatrix_from_partitions,
stack_series,
)
from xgboost import DMatrix, QuantileDMatrix
def test_stack() -> None:
a = pd.DataFrame({"a": [[1, 2], [3, 4]]})
b = stack_series(a["a"])
assert b.shape == (2, 2)
a = pd.DataFrame({"a": [[1], [3]]})
b = stack_series(a["a"])
assert b.shape == (2, 1)
a = pd.DataFrame({"a": [np.array([1, 2]), np.array([3, 4])]})
b = stack_series(a["a"])
assert b.shape == (2, 2)
a = pd.DataFrame({"a": [np.array([1]), np.array([3])]})
b = stack_series(a["a"])
assert b.shape == (2, 1)
def run_dmatrix_ctor(is_feature_cols: bool, is_qdm: bool, on_gpu: bool) -> None:
rng = np.random.default_rng(0)
dfs: List[pd.DataFrame] = []
n_features = 16
n_samples_per_batch = 16
n_batches = 10
feature_types = ["float"] * n_features
for i in range(n_batches):
X = rng.normal(loc=0, size=256).reshape(n_samples_per_batch, n_features)
y = rng.normal(loc=0, size=n_samples_per_batch)
m = rng.normal(loc=0, size=n_samples_per_batch)
w = rng.normal(loc=0.5, scale=0.5, size=n_samples_per_batch)
w -= w.min()
valid = rng.binomial(n=1, p=0.5, size=16).astype(np.bool_)
df = pd.DataFrame(
{alias.label: y, alias.margin: m, alias.weight: w, alias.valid: valid}
)
if is_feature_cols:
for j in range(X.shape[1]):
df[f"feat-{j}"] = pd.Series(X[:, j])
else:
df[alias.data] = pd.Series(list(X))
dfs.append(df)
kwargs = {"feature_types": feature_types}
device_id = 0 if on_gpu else None
cols = [f"feat-{i}" for i in range(n_features)]
feature_cols = cols if is_feature_cols else None
train_Xy, valid_Xy = create_dmatrix_from_partitions(
iter(dfs),
feature_cols,
gpu_id=device_id,
use_qdm=is_qdm,
kwargs=kwargs,
enable_sparse_data_optim=False,
has_validation_col=True,
)
if is_qdm:
assert isinstance(train_Xy, QuantileDMatrix)
assert isinstance(valid_Xy, QuantileDMatrix)
else:
assert not isinstance(train_Xy, QuantileDMatrix)
assert isinstance(train_Xy, DMatrix)
assert not isinstance(valid_Xy, QuantileDMatrix)
assert isinstance(valid_Xy, DMatrix)
assert valid_Xy is not None
assert valid_Xy.num_row() + train_Xy.num_row() == n_samples_per_batch * n_batches
assert train_Xy.num_col() == n_features
assert valid_Xy.num_col() == n_features
df = pd.concat(dfs, axis=0)
df_train = df.loc[~df[alias.valid], :]
df_valid = df.loc[df[alias.valid], :]
assert df_train.shape[0] == train_Xy.num_row()
assert df_valid.shape[0] == valid_Xy.num_row()
# margin
np.testing.assert_allclose(
df_train[alias.margin].to_numpy(), train_Xy.get_base_margin()
)
np.testing.assert_allclose(
df_valid[alias.margin].to_numpy(), valid_Xy.get_base_margin()
)
# weight
np.testing.assert_allclose(df_train[alias.weight].to_numpy(), train_Xy.get_weight())
np.testing.assert_allclose(df_valid[alias.weight].to_numpy(), valid_Xy.get_weight())
# label
np.testing.assert_allclose(df_train[alias.label].to_numpy(), train_Xy.get_label())
np.testing.assert_allclose(df_valid[alias.label].to_numpy(), valid_Xy.get_label())
np.testing.assert_equal(train_Xy.feature_types, feature_types)
np.testing.assert_equal(valid_Xy.feature_types, feature_types)
@pytest.mark.parametrize(
"is_feature_cols,is_qdm",
[(True, True), (True, False), (False, True), (False, False)],
)
def test_dmatrix_ctor(is_feature_cols: bool, is_qdm: bool) -> None:
run_dmatrix_ctor(is_feature_cols, is_qdm, on_gpu=False)
def test_read_csr_matrix_from_unwrapped_spark_vec() -> None:
from scipy.sparse import csr_matrix
pd1 = pd.DataFrame(
{
"featureVectorType": [0, 1, 1, 0],
"featureVectorSize": [3, None, None, 3],
"featureVectorIndices": [
np.array([0, 2], dtype=np.int32),
None,
None,
np.array([1, 2], dtype=np.int32),
],
"featureVectorValues": [
np.array([3.0, 0.0], dtype=np.float64),
np.array([13.0, 14.0, 0.0], dtype=np.float64),
np.array([0.0, 24.0, 25.0], dtype=np.float64),
np.array([0.0, 35.0], dtype=np.float64),
],
}
)
sm = _read_csr_matrix_from_unwrapped_spark_vec(pd1)
assert isinstance(sm, csr_matrix)
np.testing.assert_array_equal(
sm.data, [3.0, 0.0, 13.0, 14.0, 0.0, 0.0, 24.0, 25.0, 0.0, 35.0]
)
np.testing.assert_array_equal(sm.indptr, [0, 2, 5, 8, 10])
np.testing.assert_array_equal(sm.indices, [0, 2, 0, 1, 2, 0, 1, 2, 1, 2])
assert sm.shape == (4, 3)