-
-
Notifications
You must be signed in to change notification settings - Fork 8.7k
/
test_data.py
141 lines (116 loc) · 4.53 KB
/
test_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import sys
from typing import List
import numpy as np
import pandas as pd
import pytest
import testing as tm
if tm.no_spark()["condition"]:
pytest.skip(msg=tm.no_spark()["reason"], allow_module_level=True)
if sys.platform.startswith("win") or sys.platform.startswith("darwin"):
pytest.skip("Skipping PySpark tests on Windows", allow_module_level=True)
from xgboost.spark.data import (
_read_csr_matrix_from_unwrapped_spark_vec,
alias,
create_dmatrix_from_partitions,
stack_series,
)
def test_stack() -> None:
a = pd.DataFrame({"a": [[1, 2], [3, 4]]})
b = stack_series(a["a"])
assert b.shape == (2, 2)
a = pd.DataFrame({"a": [[1], [3]]})
b = stack_series(a["a"])
assert b.shape == (2, 1)
a = pd.DataFrame({"a": [np.array([1, 2]), np.array([3, 4])]})
b = stack_series(a["a"])
assert b.shape == (2, 2)
a = pd.DataFrame({"a": [np.array([1]), np.array([3])]})
b = stack_series(a["a"])
assert b.shape == (2, 1)
def run_dmatrix_ctor(is_dqm: bool) -> None:
rng = np.random.default_rng(0)
dfs: List[pd.DataFrame] = []
n_features = 16
n_samples_per_batch = 16
n_batches = 10
feature_types = ["float"] * n_features
for i in range(n_batches):
X = rng.normal(loc=0, size=256).reshape(n_samples_per_batch, n_features)
y = rng.normal(loc=0, size=n_samples_per_batch)
m = rng.normal(loc=0, size=n_samples_per_batch)
w = rng.normal(loc=0.5, scale=0.5, size=n_samples_per_batch)
w -= w.min()
valid = rng.binomial(n=1, p=0.5, size=16).astype(np.bool_)
df = pd.DataFrame(
{alias.label: y, alias.margin: m, alias.weight: w, alias.valid: valid}
)
if is_dqm:
for j in range(X.shape[1]):
df[f"feat-{j}"] = pd.Series(X[:, j])
else:
df[alias.data] = pd.Series(list(X))
dfs.append(df)
kwargs = {"feature_types": feature_types}
if is_dqm:
cols = [f"feat-{i}" for i in range(n_features)]
train_Xy, valid_Xy = create_dmatrix_from_partitions(
iter(dfs), cols, 0, kwargs, False, True
)
else:
train_Xy, valid_Xy = create_dmatrix_from_partitions(
iter(dfs), None, None, kwargs, False, True
)
assert valid_Xy is not None
assert valid_Xy.num_row() + train_Xy.num_row() == n_samples_per_batch * n_batches
assert train_Xy.num_col() == n_features
assert valid_Xy.num_col() == n_features
df = pd.concat(dfs, axis=0)
df_train = df.loc[~df[alias.valid], :]
df_valid = df.loc[df[alias.valid], :]
assert df_train.shape[0] == train_Xy.num_row()
assert df_valid.shape[0] == valid_Xy.num_row()
# margin
np.testing.assert_allclose(
df_train[alias.margin].to_numpy(), train_Xy.get_base_margin()
)
np.testing.assert_allclose(
df_valid[alias.margin].to_numpy(), valid_Xy.get_base_margin()
)
# weight
np.testing.assert_allclose(df_train[alias.weight].to_numpy(), train_Xy.get_weight())
np.testing.assert_allclose(df_valid[alias.weight].to_numpy(), valid_Xy.get_weight())
# label
np.testing.assert_allclose(df_train[alias.label].to_numpy(), train_Xy.get_label())
np.testing.assert_allclose(df_valid[alias.label].to_numpy(), valid_Xy.get_label())
np.testing.assert_equal(train_Xy.feature_types, feature_types)
np.testing.assert_equal(valid_Xy.feature_types, feature_types)
def test_dmatrix_ctor() -> None:
run_dmatrix_ctor(False)
def test_read_csr_matrix_from_unwrapped_spark_vec() -> None:
from scipy.sparse import csr_matrix
pd1 = pd.DataFrame(
{
"featureVectorType": [0, 1, 1, 0],
"featureVectorSize": [3, None, None, 3],
"featureVectorIndices": [
np.array([0, 2], dtype=np.int32),
None,
None,
np.array([1, 2], dtype=np.int32),
],
"featureVectorValues": [
np.array([3.0, 0.0], dtype=np.float64),
np.array([13.0, 14.0, 0.0], dtype=np.float64),
np.array([0.0, 24.0, 25.0], dtype=np.float64),
np.array([0.0, 35.0], dtype=np.float64),
],
}
)
sm = _read_csr_matrix_from_unwrapped_spark_vec(pd1)
assert isinstance(sm, csr_matrix)
np.testing.assert_array_equal(
sm.data, [3.0, 0.0, 13.0, 14.0, 0.0, 0.0, 24.0, 25.0, 0.0, 35.0]
)
np.testing.assert_array_equal(sm.indptr, [0, 2, 5, 8, 10])
np.testing.assert_array_equal(sm.indices, [0, 2, 0, 1, 2, 0, 1, 2, 1, 2])
assert sm.shape == (4, 3)