forked from dmlc/xgboost
-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
144 lines (121 loc) · 4.16 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import contextlib
import logging
import shutil
import sys
import tempfile
import unittest
import pytest
import testing as tm
from six import StringIO
if tm.no_spark()["condition"]:
pytest.skip(msg=tm.no_spark()["reason"], allow_module_level=True)
if sys.platform.startswith("win") or sys.platform.startswith("darwin"):
pytest.skip("Skipping PySpark tests on Windows", allow_module_level=True)
from pyspark.sql import SparkSession, SQLContext
from xgboost.spark.utils import _get_default_params_from_func
class UtilsTest(unittest.TestCase):
def test_get_default_params(self):
class Foo:
def func1(self, x, y, key1=None, key2="val2", key3=0, key4=None):
pass
unsupported_params = {"key2", "key4"}
expected_default_params = {
"key1": None,
"key3": 0,
}
actual_default_params = _get_default_params_from_func(
Foo.func1, unsupported_params
)
self.assertEqual(
len(expected_default_params.keys()), len(actual_default_params.keys())
)
for k, v in actual_default_params.items():
self.assertEqual(expected_default_params[k], v)
@contextlib.contextmanager
def patch_stdout():
"""patch stdout and give an output"""
sys_stdout = sys.stdout
io_out = StringIO()
sys.stdout = io_out
try:
yield io_out
finally:
sys.stdout = sys_stdout
@contextlib.contextmanager
def patch_logger(name):
"""patch logger and give an output"""
io_out = StringIO()
log = logging.getLogger(name)
handler = logging.StreamHandler(io_out)
log.addHandler(handler)
try:
yield io_out
finally:
log.removeHandler(handler)
class TestTempDir(object):
@classmethod
def make_tempdir(cls):
"""
:param dir: Root directory in which to create the temp directory
"""
cls.tempdir = tempfile.mkdtemp(prefix="sparkdl_tests")
@classmethod
def remove_tempdir(cls):
shutil.rmtree(cls.tempdir)
class TestSparkContext(object):
@classmethod
def setup_env(cls, spark_config):
builder = SparkSession.builder.appName("xgboost spark python API Tests")
for k, v in spark_config.items():
builder.config(k, v)
spark = builder.getOrCreate()
logging.getLogger("pyspark").setLevel(logging.INFO)
cls.sc = spark.sparkContext
cls.session = spark
@classmethod
def tear_down_env(cls):
cls.session.stop()
cls.session = None
cls.sc.stop()
cls.sc = None
class SparkTestCase(TestSparkContext, TestTempDir, unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.setup_env(
{
"spark.master": "local[4]",
"spark.python.worker.reuse": "false",
"spark.driver.host": "127.0.0.1",
"spark.task.maxFailures": "1",
"spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled": "false",
"spark.sql.pyspark.jvmStacktrace.enabled": "true",
}
)
cls.make_tempdir()
@classmethod
def tearDownClass(cls):
cls.remove_tempdir()
cls.tear_down_env()
class SparkLocalClusterTestCase(TestSparkContext, TestTempDir, unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.setup_env(
{
"spark.master": "local-cluster[2, 2, 1024]",
"spark.python.worker.reuse": "false",
"spark.driver.host": "127.0.0.1",
"spark.task.maxFailures": "1",
"spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled": "false",
"spark.sql.pyspark.jvmStacktrace.enabled": "true",
"spark.cores.max": "4",
"spark.task.cpus": "1",
"spark.executor.cores": "2",
}
)
cls.make_tempdir()
# We run a dummy job so that we block until the workers have connected to the master
cls.sc.parallelize(range(4), 4).barrier().mapPartitions(lambda _: []).collect()
@classmethod
def tearDownClass(cls):
cls.remove_tempdir()
cls.tear_down_env()