mirror of
https://github.com/Doctorado-ML/STree.git
synced 2025-08-16 16:06:01 +00:00
#15 Create impurity function in Stree (consistent name, same criteria as other splitter parameter) Create test for the new function Update init test Update test splitter parameters Rename old impurity function to partition_impurity close #15 * Complete implementation of splitter_type = impurity with tests Remove max_distance & min_distance splitter types * Fix mistake in computing multiclass node belief Set default criterion for split to entropy instead of gini Set default max_iter to 1e5 instead of 1e3 change up-down criterion to match SVC multiclass Fix impurity method of splitting nodes Update jupyter Notebooks
445 lines
16 KiB
Python
445 lines
16 KiB
Python
import os
|
|
import unittest
|
|
import warnings
|
|
|
|
import numpy as np
|
|
from sklearn.datasets import load_iris, load_wine
|
|
from sklearn.exceptions import ConvergenceWarning
|
|
from sklearn.svm import LinearSVC
|
|
|
|
from stree import Stree, Snode
|
|
from .utils import load_dataset
|
|
|
|
|
|
class Stree_test(unittest.TestCase):
|
|
def __init__(self, *args, **kwargs):
|
|
self._random_state = 1
|
|
self._kernels = ["linear", "rbf", "poly"]
|
|
super().__init__(*args, **kwargs)
|
|
|
|
@classmethod
|
|
def setUp(cls):
|
|
os.environ["TESTING"] = "1"
|
|
|
|
def _check_tree(self, node: Snode):
|
|
"""Check recursively that the nodes that are not leaves have the
|
|
correct number of labels and its sons have the right number of elements
|
|
in their dataset
|
|
|
|
Arguments:
|
|
node {Snode} -- node to check
|
|
"""
|
|
if node.is_leaf():
|
|
return
|
|
y_prediction = node._clf.predict(node._X)
|
|
y_down = node.get_down()._y
|
|
y_up = node.get_up()._y
|
|
# Is a correct partition in terms of cadinality?
|
|
# i.e. The partition algorithm didn't forget any sample
|
|
self.assertEqual(node._y.shape[0], y_down.shape[0] + y_up.shape[0])
|
|
unique_y, count_y = np.unique(node._y, return_counts=True)
|
|
_, count_d = np.unique(y_down, return_counts=True)
|
|
_, count_u = np.unique(y_up, return_counts=True)
|
|
#
|
|
for i in unique_y:
|
|
number_up = count_u[i]
|
|
try:
|
|
number_down = count_d[i]
|
|
except IndexError:
|
|
number_down = 0
|
|
self.assertEqual(count_y[i], number_down + number_up)
|
|
# Is the partition made the same as the prediction?
|
|
# as the node is not a leaf...
|
|
_, count_yp = np.unique(y_prediction, return_counts=True)
|
|
self.assertEqual(count_yp[1], y_up.shape[0])
|
|
self.assertEqual(count_yp[0], y_down.shape[0])
|
|
self._check_tree(node.get_down())
|
|
self._check_tree(node.get_up())
|
|
|
|
def test_build_tree(self):
|
|
"""Check if the tree is built the same way as predictions of models"""
|
|
warnings.filterwarnings("ignore")
|
|
for kernel in self._kernels:
|
|
clf = Stree(kernel=kernel, random_state=self._random_state)
|
|
clf.fit(*load_dataset(self._random_state))
|
|
self._check_tree(clf.tree_)
|
|
|
|
def test_single_prediction(self):
|
|
X, y = load_dataset(self._random_state)
|
|
for kernel in self._kernels:
|
|
clf = Stree(kernel=kernel, random_state=self._random_state)
|
|
yp = clf.fit(X, y).predict((X[0, :].reshape(-1, X.shape[1])))
|
|
self.assertEqual(yp[0], y[0])
|
|
|
|
def test_multiple_prediction(self):
|
|
# First 27 elements the predictions are the same as the truth
|
|
num = 27
|
|
X, y = load_dataset(self._random_state)
|
|
for kernel in self._kernels:
|
|
clf = Stree(kernel=kernel, random_state=self._random_state)
|
|
yp = clf.fit(X, y).predict(X[:num, :])
|
|
self.assertListEqual(y[:num].tolist(), yp.tolist())
|
|
|
|
def test_single_vs_multiple_prediction(self):
|
|
"""Check if predicting sample by sample gives the same result as
|
|
predicting all samples at once
|
|
"""
|
|
X, y = load_dataset(self._random_state)
|
|
for kernel in self._kernels:
|
|
clf = Stree(kernel=kernel, random_state=self._random_state)
|
|
clf.fit(X, y)
|
|
# Compute prediction line by line
|
|
yp_line = np.array([], dtype=int)
|
|
for xp in X:
|
|
yp_line = np.append(
|
|
yp_line, clf.predict(xp.reshape(-1, X.shape[1]))
|
|
)
|
|
# Compute prediction at once
|
|
yp_once = clf.predict(X)
|
|
self.assertListEqual(yp_line.tolist(), yp_once.tolist())
|
|
|
|
def test_iterator_and_str(self):
|
|
"""Check preorder iterator"""
|
|
expected = [
|
|
"root feaures=(0, 1, 2) impurity=1.0000 counts=(array([0, 1]), arr"
|
|
"ay([750, 750]))",
|
|
"root - Down, <cgaf> - Leaf class=0 belief= 0.928297 impurity=0.37"
|
|
"22 counts=(array([0, 1]), array([725, 56]))",
|
|
"root - Up feaures=(0, 1, 2) impurity=0.2178 counts=(array([0, 1])"
|
|
", array([ 25, 694]))",
|
|
"root - Up - Down feaures=(0, 1, 2) impurity=0.8454 counts=(array("
|
|
"[0, 1]), array([8, 3]))",
|
|
"root - Up - Down - Down, <pure> - Leaf class=0 belief= 1.000000 i"
|
|
"mpurity=0.0000 counts=(array([0]), array([7]))",
|
|
"root - Up - Down - Up, <cgaf> - Leaf class=1 belief= 0.750000 imp"
|
|
"urity=0.8113 counts=(array([0, 1]), array([1, 3]))",
|
|
"root - Up - Up, <cgaf> - Leaf class=1 belief= 0.975989 impurity=0"
|
|
".1634 counts=(array([0, 1]), array([ 17, 691]))",
|
|
]
|
|
computed = []
|
|
expected_string = ""
|
|
clf = Stree(kernel="linear", random_state=self._random_state)
|
|
clf.fit(*load_dataset(self._random_state))
|
|
for node in clf:
|
|
computed.append(str(node))
|
|
expected_string += str(node) + "\n"
|
|
self.assertListEqual(expected, computed)
|
|
self.assertEqual(expected_string, str(clf))
|
|
|
|
@staticmethod
|
|
def test_is_a_sklearn_classifier():
|
|
warnings.filterwarnings("ignore", category=ConvergenceWarning)
|
|
warnings.filterwarnings("ignore", category=RuntimeWarning)
|
|
from sklearn.utils.estimator_checks import check_estimator
|
|
|
|
check_estimator(Stree())
|
|
|
|
def test_exception_if_C_is_negative(self):
|
|
tclf = Stree(C=-1)
|
|
with self.assertRaises(ValueError):
|
|
tclf.fit(*load_dataset(self._random_state))
|
|
|
|
def test_exception_if_bogus_split_criteria(self):
|
|
tclf = Stree(split_criteria="duck")
|
|
with self.assertRaises(ValueError):
|
|
tclf.fit(*load_dataset(self._random_state))
|
|
|
|
def test_check_max_depth_is_positive_or_None(self):
|
|
tcl = Stree()
|
|
self.assertIsNone(tcl.max_depth)
|
|
tcl = Stree(max_depth=1)
|
|
self.assertGreaterEqual(1, tcl.max_depth)
|
|
with self.assertRaises(ValueError):
|
|
tcl = Stree(max_depth=-1)
|
|
tcl.fit(*load_dataset(self._random_state))
|
|
|
|
def test_check_max_depth(self):
|
|
depths = (3, 4)
|
|
for depth in depths:
|
|
tcl = Stree(random_state=self._random_state, max_depth=depth)
|
|
tcl.fit(*load_dataset(self._random_state))
|
|
self.assertEqual(depth, tcl.depth_)
|
|
|
|
def test_unfitted_tree_is_iterable(self):
|
|
tcl = Stree()
|
|
self.assertEqual(0, len(list(tcl)))
|
|
|
|
def test_min_samples_split(self):
|
|
dataset = [[1], [2], [3]], [1, 1, 0]
|
|
tcl_split = Stree(min_samples_split=3).fit(*dataset)
|
|
self.assertIsNotNone(tcl_split.tree_.get_down())
|
|
self.assertIsNotNone(tcl_split.tree_.get_up())
|
|
tcl_nosplit = Stree(min_samples_split=4).fit(*dataset)
|
|
self.assertIsNone(tcl_nosplit.tree_.get_down())
|
|
self.assertIsNone(tcl_nosplit.tree_.get_up())
|
|
|
|
def test_simple_muticlass_dataset(self):
|
|
for kernel in self._kernels:
|
|
clf = Stree(
|
|
kernel=kernel,
|
|
split_criteria="max_samples",
|
|
random_state=self._random_state,
|
|
)
|
|
px = [[1, 2], [5, 6], [9, 10]]
|
|
py = [0, 1, 2]
|
|
clf.fit(px, py)
|
|
self.assertEqual(1.0, clf.score(px, py))
|
|
self.assertListEqual(py, clf.predict(px).tolist())
|
|
self.assertListEqual(py, clf.classes_.tolist())
|
|
|
|
def test_muticlass_dataset(self):
|
|
datasets = {
|
|
"Synt": load_dataset(random_state=self._random_state, n_classes=3),
|
|
"Iris": load_wine(return_X_y=True),
|
|
}
|
|
outcomes = {
|
|
"Synt": {
|
|
"max_samples linear": 0.9606666666666667,
|
|
"max_samples rbf": 0.7133333333333334,
|
|
"max_samples poly": 0.49066666666666664,
|
|
"impurity linear": 0.9606666666666667,
|
|
"impurity rbf": 0.7133333333333334,
|
|
"impurity poly": 0.49066666666666664,
|
|
},
|
|
"Iris": {
|
|
"max_samples linear": 1.0,
|
|
"max_samples rbf": 0.6910112359550562,
|
|
"max_samples poly": 0.6966292134831461,
|
|
"impurity linear": 1,
|
|
"impurity rbf": 0.6910112359550562,
|
|
"impurity poly": 0.6966292134831461,
|
|
},
|
|
}
|
|
|
|
for name, dataset in datasets.items():
|
|
px, py = dataset
|
|
for criteria in ["max_samples", "impurity"]:
|
|
for kernel in self._kernels:
|
|
clf = Stree(
|
|
C=55,
|
|
max_iter=1e5,
|
|
kernel=kernel,
|
|
random_state=self._random_state,
|
|
)
|
|
clf.fit(px, py)
|
|
outcome = outcomes[name][f"{criteria} {kernel}"]
|
|
# print(
|
|
# f"{name} {criteria} {kernel} {outcome} {clf.score(px"
|
|
# ", py)}"
|
|
# )
|
|
self.assertAlmostEqual(outcome, clf.score(px, py))
|
|
|
|
def test_max_features(self):
|
|
n_features = 16
|
|
expected_values = [
|
|
("auto", 4),
|
|
("log2", 4),
|
|
("sqrt", 4),
|
|
(0.5, 8),
|
|
(3, 3),
|
|
(None, 16),
|
|
]
|
|
clf = Stree()
|
|
clf.n_features_ = n_features
|
|
for max_features, expected in expected_values:
|
|
clf.set_params(**dict(max_features=max_features))
|
|
computed = clf._initialize_max_features()
|
|
self.assertEqual(expected, computed)
|
|
# Check bogus max_features
|
|
values = ["duck", -0.1, 0.0]
|
|
for max_features in values:
|
|
clf.set_params(**dict(max_features=max_features))
|
|
with self.assertRaises(ValueError):
|
|
_ = clf._initialize_max_features()
|
|
|
|
def test_get_subspaces(self):
|
|
dataset = np.random.random((10, 16))
|
|
y = np.random.randint(0, 2, 10)
|
|
expected_values = [
|
|
("auto", 4),
|
|
("log2", 4),
|
|
("sqrt", 4),
|
|
(0.5, 8),
|
|
(3, 3),
|
|
(None, 16),
|
|
]
|
|
clf = Stree()
|
|
for max_features, expected in expected_values:
|
|
clf.set_params(**dict(max_features=max_features))
|
|
clf.fit(dataset, y)
|
|
computed, indices = clf.splitter_.get_subspace(
|
|
dataset, y, clf.max_features_
|
|
)
|
|
self.assertListEqual(
|
|
dataset[:, indices].tolist(), computed.tolist()
|
|
)
|
|
self.assertEqual(expected, len(indices))
|
|
|
|
def test_bogus_criterion(self):
|
|
clf = Stree(criterion="duck")
|
|
with self.assertRaises(ValueError):
|
|
clf.fit(*load_dataset())
|
|
|
|
def test_predict_feature_dimensions(self):
|
|
X = np.random.rand(10, 5)
|
|
y = np.random.randint(0, 2, 10)
|
|
clf = Stree()
|
|
clf.fit(X, y)
|
|
with self.assertRaises(ValueError):
|
|
clf.predict(X[:, :3])
|
|
|
|
# Tests of score
|
|
|
|
def test_score_binary(self):
|
|
X, y = load_dataset(self._random_state)
|
|
accuracies = [
|
|
0.9506666666666667,
|
|
0.9606666666666667,
|
|
0.9433333333333334,
|
|
]
|
|
for kernel, accuracy_expected in zip(self._kernels, accuracies):
|
|
clf = Stree(
|
|
random_state=self._random_state,
|
|
kernel=kernel,
|
|
)
|
|
clf.fit(X, y)
|
|
accuracy_score = clf.score(X, y)
|
|
yp = clf.predict(X)
|
|
accuracy_computed = np.mean(yp == y)
|
|
self.assertEqual(accuracy_score, accuracy_computed)
|
|
self.assertAlmostEqual(accuracy_expected, accuracy_score)
|
|
|
|
def test_score_max_features(self):
|
|
X, y = load_dataset(self._random_state)
|
|
clf = Stree(random_state=self._random_state, max_features=2)
|
|
clf.fit(X, y)
|
|
self.assertAlmostEqual(0.944, clf.score(X, y))
|
|
|
|
def test_bogus_splitter_parameter(self):
|
|
clf = Stree(splitter="duck")
|
|
with self.assertRaises(ValueError):
|
|
clf.fit(*load_dataset())
|
|
|
|
def test_weights_removing_class(self):
|
|
# This patch solves an stderr message from sklearn svm lib
|
|
# "WARNING: class label x specified in weight is not found"
|
|
X = np.array(
|
|
[
|
|
[0.1, 0.1],
|
|
[0.1, 0.2],
|
|
[0.2, 0.1],
|
|
[5, 6],
|
|
[8, 9],
|
|
[6, 7],
|
|
[0.2, 0.2],
|
|
]
|
|
)
|
|
y = np.array([0, 0, 0, 1, 1, 1, 0])
|
|
epsilon = 1e-5
|
|
weights = [1, 1, 1, 0, 0, 0, 1]
|
|
weights = np.array(weights, dtype="float64")
|
|
weights_epsilon = [x + epsilon for x in weights]
|
|
weights_no_zero = np.array([1, 1, 1, 0, 0, 2, 1])
|
|
original = weights_no_zero.copy()
|
|
clf = Stree()
|
|
clf.fit(X, y)
|
|
node = clf.train(
|
|
X,
|
|
y,
|
|
weights,
|
|
1,
|
|
"test",
|
|
)
|
|
# if a class is lost with zero weights the patch adds epsilon
|
|
self.assertListEqual(weights.tolist(), weights_epsilon)
|
|
self.assertListEqual(node._sample_weight.tolist(), weights_epsilon)
|
|
# zero weights are ok when they don't erase a class
|
|
_ = clf.train(X, y, weights_no_zero, 1, "test")
|
|
self.assertListEqual(weights_no_zero.tolist(), original.tolist())
|
|
|
|
def test_multiclass_classifier_integrity(self):
|
|
"""Checks if the multiclass operation is done right"""
|
|
X, y = load_iris(return_X_y=True)
|
|
clf = Stree(random_state=0)
|
|
clf.fit(X, y)
|
|
score = clf.score(X, y)
|
|
# Check accuracy of the whole model
|
|
self.assertAlmostEquals(0.98, score, 5)
|
|
svm = LinearSVC(random_state=0)
|
|
svm.fit(X, y)
|
|
self.assertAlmostEquals(0.9666666666666667, svm.score(X, y), 5)
|
|
data = svm.decision_function(X)
|
|
expected = [
|
|
0.4444444444444444,
|
|
0.35777777777777775,
|
|
0.4569777777777778,
|
|
]
|
|
ty = data.copy()
|
|
ty[data <= 0] = 0
|
|
ty[data > 0] = 1
|
|
ty = ty.astype(int)
|
|
for i in range(3):
|
|
self.assertAlmostEquals(
|
|
expected[i],
|
|
clf.splitter_._gini(ty[:, i]),
|
|
)
|
|
# 1st Branch
|
|
# up has to have 50 samples of class 0
|
|
# down should have 100 [50, 50]
|
|
up = data[:, 2] > 0
|
|
resup = np.unique(y[up], return_counts=True)
|
|
resdn = np.unique(y[~up], return_counts=True)
|
|
self.assertListEqual([1, 2], resup[0].tolist())
|
|
self.assertListEqual([3, 50], resup[1].tolist())
|
|
self.assertListEqual([0, 1], resdn[0].tolist())
|
|
self.assertListEqual([50, 47], resdn[1].tolist())
|
|
# 2nd Branch
|
|
# up should have 53 samples of classes [1, 2] [3, 50]
|
|
# down shoud have 47 samples of class 1
|
|
node_up = clf.tree_.get_down().get_up()
|
|
node_dn = clf.tree_.get_down().get_down()
|
|
resup = np.unique(node_up._y, return_counts=True)
|
|
resdn = np.unique(node_dn._y, return_counts=True)
|
|
self.assertListEqual([1, 2], resup[0].tolist())
|
|
self.assertListEqual([3, 50], resup[1].tolist())
|
|
self.assertListEqual([1], resdn[0].tolist())
|
|
self.assertListEqual([47], resdn[1].tolist())
|
|
|
|
def test_score_multiclass_rbf(self):
|
|
X, y = load_dataset(
|
|
random_state=self._random_state,
|
|
n_classes=3,
|
|
n_features=5,
|
|
n_samples=500,
|
|
)
|
|
clf = Stree(kernel="rbf", random_state=self._random_state)
|
|
self.assertEqual(0.824, clf.fit(X, y).score(X, y))
|
|
X, y = load_wine(return_X_y=True)
|
|
self.assertEqual(0.6741573033707865, clf.fit(X, y).score(X, y))
|
|
|
|
def test_score_multiclass_poly(self):
|
|
X, y = load_dataset(
|
|
random_state=self._random_state,
|
|
n_classes=3,
|
|
n_features=5,
|
|
n_samples=500,
|
|
)
|
|
clf = Stree(
|
|
kernel="poly", random_state=self._random_state, C=10, degree=5
|
|
)
|
|
self.assertEqual(0.786, clf.fit(X, y).score(X, y))
|
|
X, y = load_wine(return_X_y=True)
|
|
self.assertEqual(0.702247191011236, clf.fit(X, y).score(X, y))
|
|
|
|
def test_score_multiclass_linear(self):
|
|
X, y = load_dataset(
|
|
random_state=self._random_state,
|
|
n_classes=3,
|
|
n_features=5,
|
|
n_samples=1500,
|
|
)
|
|
clf = Stree(kernel="linear", random_state=self._random_state)
|
|
self.assertEqual(0.9533333333333334, clf.fit(X, y).score(X, y))
|
|
X, y = load_wine(return_X_y=True)
|
|
self.assertEqual(0.9550561797752809, clf.fit(X, y).score(X, y))
|