Compare commits

...

5 Commits

Author SHA1 Message Date
eef076dcba Add python 3.10 to CI 2022-06-01 01:58:09 +02:00
9e8d03d088 Add predict_proba test 2022-05-31 23:46:12 +02:00
0a78d5be67 Implement optimized predict and new predict_proba 2022-05-31 19:12:48 +02:00
65923af9b4 Add complete classes counts to node and tests 2022-05-31 01:21:03 +02:00
Ricardo Montañana Gómez
93be8a89a8 Graphviz (#52)
* Add graphviz representation of the tree

* Complete graphviz test
Add comments to some tests

* Add optional title to tree graph

* Add fontcolor keyword to nodes of the tree

* Add color keyword to arrows of graph

* Update version file to 1.2.4
2022-04-17 19:47:58 +02:00
6 changed files with 215 additions and 76 deletions

View File

@@ -13,7 +13,7 @@ jobs:
strategy: strategy:
matrix: matrix:
os: [macos-latest, ubuntu-latest, windows-latest] os: [macos-latest, ubuntu-latest, windows-latest]
python: [3.8] python: [3.8, "3.10"]
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2

View File

@@ -68,6 +68,7 @@ class Snode:
self._impurity = impurity self._impurity = impurity
self._partition_column: int = -1 self._partition_column: int = -1
self._scaler = scaler self._scaler = scaler
self._proba = None
@classmethod @classmethod
def copy(cls, node: "Snode") -> "Snode": def copy(cls, node: "Snode") -> "Snode":
@@ -127,24 +128,45 @@ class Snode:
def get_up(self) -> "Snode": def get_up(self) -> "Snode":
return self._up return self._up
def make_predictor(self): def make_predictor(self, num_classes: int) -> None:
"""Compute the class of the predictor and its belief based on the """Compute the class of the predictor and its belief based on the
subdataset of the node only if it is a leaf subdataset of the node only if it is a leaf
""" """
if not self.is_leaf(): if not self.is_leaf():
return return
classes, card = np.unique(self._y, return_counts=True) classes, card = np.unique(self._y, return_counts=True)
if len(classes) > 1: self._proba = np.zeros((num_classes,), dtype=np.int64)
for c, n in zip(classes, card):
self._proba[c] = n
try:
max_card = max(card) max_card = max(card)
self._class = classes[card == max_card][0] self._class = classes[card == max_card][0]
self._belief = max_card / np.sum(card) self._belief = max_card / np.sum(card)
else: except ValueError:
self._belief = 1
try:
self._class = classes[0]
except IndexError:
self._class = None self._class = None
def graph(self):
"""
Return a string representing the node in graphviz format
"""
output = ""
count_values = np.unique(self._y, return_counts=True)
if self.is_leaf():
output += (
f'N{id(self)} [shape=box style=filled label="'
f"class={self._class} impurity={self._impurity:.3f} "
f'counts={self._proba}"];\n'
)
else:
output += (
f'N{id(self)} [label="#features={len(self._features)} '
f"classes={count_values[0]} samples={count_values[1]} "
f'({sum(count_values[1])})" fontcolor=black];\n'
)
output += f"N{id(self)} -> N{id(self.get_up())} [color=black];\n"
output += f"N{id(self)} -> N{id(self.get_down())} [color=black];\n"
return output
def __str__(self) -> str: def __str__(self) -> str:
count_values = np.unique(self._y, return_counts=True) count_values = np.unique(self._y, return_counts=True)
if self.is_leaf(): if self.is_leaf():

View File

@@ -314,7 +314,7 @@ class Stree(BaseEstimator, ClassifierMixin):
if np.unique(y).shape[0] == 1: if np.unique(y).shape[0] == 1:
# only 1 class => pure dataset # only 1 class => pure dataset
node.set_title(title + ", <pure>") node.set_title(title + ", <pure>")
node.make_predictor() node.make_predictor(self.n_classes_)
return node return node
# Train the model # Train the model
clf = self._build_clf() clf = self._build_clf()
@@ -333,7 +333,7 @@ class Stree(BaseEstimator, ClassifierMixin):
if X_U is None or X_D is None: if X_U is None or X_D is None:
# didn't part anything # didn't part anything
node.set_title(title + ", <cgaf>") node.set_title(title + ", <cgaf>")
node.make_predictor() node.make_predictor(self.n_classes_)
return node return node
node.set_up( node.set_up(
self._train(X_U, y_u, sw_u, depth + 1, title + f" - Up({depth+1})") self._train(X_U, y_u, sw_u, depth + 1, title + f" - Up({depth+1})")
@@ -367,28 +367,66 @@ class Stree(BaseEstimator, ClassifierMixin):
) )
) )
@staticmethod def __predict_class(self, X: np.array) -> np.array:
def _reorder_results(y: np.array, indices: np.array) -> np.array: def compute_prediction(xp, indices, node):
"""Reorder an array based on the array of indices passed if xp is None:
return
if node.is_leaf():
# set a class for indices
result[indices] = node._proba
return
self.splitter_.partition(xp, node, train=False)
x_u, x_d = self.splitter_.part(xp)
i_u, i_d = self.splitter_.part(indices)
compute_prediction(x_u, i_u, node.get_up())
compute_prediction(x_d, i_d, node.get_down())
# setup prediction & make it happen
result = np.zeros((X.shape[0], self.n_classes_))
indices = np.arange(X.shape[0])
compute_prediction(X, indices, self.tree_)
return result
def check_predict(self, X) -> np.array:
check_is_fitted(self, ["tree_"])
# Input validation
X = check_array(X)
if X.shape[1] != self.n_features_:
raise ValueError(
f"Expected {self.n_features_} features but got "
f"({X.shape[1]})"
)
return X
def predict_proba(self, X: np.array) -> np.array:
"""Predict class probabilities of the input samples X.
The predicted class probability is the fraction of samples of the same
class in a leaf.
Parameters Parameters
---------- ----------
y : np.array X : dataset of samples.
data untidy
indices : np.array
indices used to set order
Returns Returns
------- -------
np.array proba : array of shape (n_samples, n_classes)
array y ordered The class probabilities of the input samples.
Raises
------
ValueError
if dataset with inconsistent number of features
NotFittedError
if model is not fitted
""" """
# return array of same type given in y
y_ordered = y.copy() X = self.check_predict(X)
indices = indices.astype(int) # return # of samples of each class in leaf node
for i, index in enumerate(indices): values = self.__predict_class(X)
y_ordered[index] = y[i] normalizer = values.sum(axis=1)[:, np.newaxis]
return y_ordered normalizer[normalizer == 0.0] = 1.0
return values / normalizer
def predict(self, X: np.array) -> np.array: def predict(self, X: np.array) -> np.array:
"""Predict labels for each sample in dataset passed """Predict labels for each sample in dataset passed
@@ -410,40 +448,8 @@ class Stree(BaseEstimator, ClassifierMixin):
NotFittedError NotFittedError
if model is not fitted if model is not fitted
""" """
X = self.check_predict(X)
def predict_class( return self.classes_[np.argmax(self.__predict_class(X), axis=1)]
xp: np.array, indices: np.array, node: Snode
) -> np.array:
if xp is None:
return [], []
if node.is_leaf():
# set a class for every sample in dataset
prediction = np.full((xp.shape[0], 1), node._class)
return prediction, indices
self.splitter_.partition(xp, node, train=False)
x_u, x_d = self.splitter_.part(xp)
i_u, i_d = self.splitter_.part(indices)
prx_u, prin_u = predict_class(x_u, i_u, node.get_up())
prx_d, prin_d = predict_class(x_d, i_d, node.get_down())
return np.append(prx_u, prx_d), np.append(prin_u, prin_d)
# sklearn check
check_is_fitted(self, ["tree_"])
# Input validation
X = check_array(X)
if X.shape[1] != self.n_features_:
raise ValueError(
f"Expected {self.n_features_} features but got "
f"({X.shape[1]})"
)
# setup prediction & make it happen
indices = np.arange(X.shape[0])
result = (
self._reorder_results(*predict_class(X, indices, self.tree_))
.astype(int)
.ravel()
)
return self.classes_[result]
def nodes_leaves(self) -> tuple: def nodes_leaves(self) -> tuple:
"""Compute the number of nodes and leaves in the built tree """Compute the number of nodes and leaves in the built tree
@@ -476,6 +482,23 @@ class Stree(BaseEstimator, ClassifierMixin):
tree = None tree = None
return Siterator(tree) return Siterator(tree)
def graph(self, title="") -> str:
"""Graphviz code representing the tree
Returns
-------
str
graphviz code
"""
output = (
"digraph STree {\nlabel=<STree "
f"{title}>\nfontsize=30\nfontcolor=blue\nlabelloc=t\n"
)
for node in self:
output += node.graph()
output += "}\n"
return output
def __str__(self) -> str: def __str__(self) -> str:
"""String representation of the tree """String representation of the tree

View File

@@ -1 +1 @@
__version__ = "1.2.3" __version__ = "1.2.4"

View File

@@ -67,10 +67,28 @@ class Snode_test(unittest.TestCase):
def test_make_predictor_on_leaf(self): def test_make_predictor_on_leaf(self):
test = Snode(None, [1, 2, 3, 4], [1, 0, 1, 1], [], 0.0, "test") test = Snode(None, [1, 2, 3, 4], [1, 0, 1, 1], [], 0.0, "test")
test.make_predictor() test.make_predictor(2)
self.assertEqual(1, test._class) self.assertEqual(1, test._class)
self.assertEqual(0.75, test._belief) self.assertEqual(0.75, test._belief)
self.assertEqual(-1, test._partition_column) self.assertEqual(-1, test._partition_column)
self.assertListEqual([1, 3], test._proba.tolist())
def test_make_predictor_on_not_leaf(self):
test = Snode(None, [1, 2, 3, 4], [1, 0, 1, 1], [], 0.0, "test")
test.set_up(Snode(None, [1], [1], [], 0.0, "another_test"))
test.make_predictor(2)
self.assertIsNone(test._class)
self.assertEqual(0, test._belief)
self.assertEqual(-1, test._partition_column)
self.assertEqual(-1, test.get_up()._partition_column)
self.assertIsNone(test._proba)
def test_make_predictor_on_leaf_bogus_data(self):
test = Snode(None, [1, 2, 3, 4], [], [], 0.0, "test")
test.make_predictor(2)
self.assertIsNone(test._class)
self.assertEqual(-1, test._partition_column)
self.assertListEqual([0, 0], test._proba.tolist())
def test_set_title(self): def test_set_title(self):
test = Snode(None, [1, 2, 3, 4], [1, 0, 1, 1], [], 0.0, "test") test = Snode(None, [1, 2, 3, 4], [1, 0, 1, 1], [], 0.0, "test")
@@ -97,21 +115,6 @@ class Snode_test(unittest.TestCase):
test.set_features([1, 2]) test.set_features([1, 2])
self.assertListEqual([1, 2], test.get_features()) self.assertListEqual([1, 2], test.get_features())
def test_make_predictor_on_not_leaf(self):
test = Snode(None, [1, 2, 3, 4], [1, 0, 1, 1], [], 0.0, "test")
test.set_up(Snode(None, [1], [1], [], 0.0, "another_test"))
test.make_predictor()
self.assertIsNone(test._class)
self.assertEqual(0, test._belief)
self.assertEqual(-1, test._partition_column)
self.assertEqual(-1, test.get_up()._partition_column)
def test_make_predictor_on_leaf_bogus_data(self):
test = Snode(None, [1, 2, 3, 4], [], [], 0.0, "test")
test.make_predictor()
self.assertIsNone(test._class)
self.assertEqual(-1, test._partition_column)
def test_copy_node(self): def test_copy_node(self):
px = [1, 2, 3, 4] px = [1, 2, 3, 4]
py = [1] py = [1]

View File

@@ -115,6 +115,38 @@ class Stree_test(unittest.TestCase):
yp = clf.fit(X, y).predict(X[:num, :]) yp = clf.fit(X, y).predict(X[:num, :])
self.assertListEqual(y[:num].tolist(), yp.tolist()) self.assertListEqual(y[:num].tolist(), yp.tolist())
def test_multiple_predict_proba(self):
expected = {
"liblinear": {
0: [0.02401129943502825, 0.9759887005649718],
17: [0.9282970550576184, 0.07170294494238157],
},
"linear": {
0: [0.029329608938547486, 0.9706703910614525],
17: [0.9298469387755102, 0.07015306122448979],
},
"rbf": {
0: [0.023448275862068966, 0.976551724137931],
17: [0.9458064516129032, 0.05419354838709677],
},
"poly": {
0: [0.01601164483260553, 0.9839883551673945],
17: [0.9089790897908979, 0.0910209102091021],
},
}
indices = [0, 17]
X, y = load_dataset(self._random_state)
for kernel in ["liblinear", "linear", "rbf", "poly"]:
clf = Stree(
kernel=kernel,
multiclass_strategy="ovr" if kernel == "liblinear" else "ovo",
random_state=self._random_state,
)
yp = clf.fit(X, y).predict_proba(X)
for index in indices:
for exp, comp in zip(expected[kernel][index], yp[index]):
self.assertAlmostEqual(exp, comp)
def test_single_vs_multiple_prediction(self): def test_single_vs_multiple_prediction(self):
"""Check if predicting sample by sample gives the same result as """Check if predicting sample by sample gives the same result as
predicting all samples at once predicting all samples at once
@@ -358,6 +390,7 @@ class Stree_test(unittest.TestCase):
# Tests of score # Tests of score
def test_score_binary(self): def test_score_binary(self):
"""Check score for binary classification."""
X, y = load_dataset(self._random_state) X, y = load_dataset(self._random_state)
accuracies = [ accuracies = [
0.9506666666666667, 0.9506666666666667,
@@ -380,6 +413,7 @@ class Stree_test(unittest.TestCase):
self.assertAlmostEqual(accuracy_expected, accuracy_score) self.assertAlmostEqual(accuracy_expected, accuracy_score)
def test_score_max_features(self): def test_score_max_features(self):
"""Check score using max_features."""
X, y = load_dataset(self._random_state) X, y = load_dataset(self._random_state)
clf = Stree( clf = Stree(
kernel="liblinear", kernel="liblinear",
@@ -391,6 +425,7 @@ class Stree_test(unittest.TestCase):
self.assertAlmostEqual(0.9453333333333334, clf.score(X, y)) self.assertAlmostEqual(0.9453333333333334, clf.score(X, y))
def test_bogus_splitter_parameter(self): def test_bogus_splitter_parameter(self):
"""Check that bogus splitter parameter raises exception."""
clf = Stree(splitter="duck") clf = Stree(splitter="duck")
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
clf.fit(*load_dataset()) clf.fit(*load_dataset())
@@ -446,6 +481,7 @@ class Stree_test(unittest.TestCase):
self.assertListEqual([47], resdn[1].tolist()) self.assertListEqual([47], resdn[1].tolist())
def test_score_multiclass_rbf(self): def test_score_multiclass_rbf(self):
"""Test score for multiclass classification with rbf kernel."""
X, y = load_dataset( X, y = load_dataset(
random_state=self._random_state, random_state=self._random_state,
n_classes=3, n_classes=3,
@@ -463,6 +499,7 @@ class Stree_test(unittest.TestCase):
self.assertEqual(1.0, clf2.fit(X, y).score(X, y)) self.assertEqual(1.0, clf2.fit(X, y).score(X, y))
def test_score_multiclass_poly(self): def test_score_multiclass_poly(self):
"""Test score for multiclass classification with poly kernel."""
X, y = load_dataset( X, y = load_dataset(
random_state=self._random_state, random_state=self._random_state,
n_classes=3, n_classes=3,
@@ -484,6 +521,7 @@ class Stree_test(unittest.TestCase):
self.assertEqual(1.0, clf2.fit(X, y).score(X, y)) self.assertEqual(1.0, clf2.fit(X, y).score(X, y))
def test_score_multiclass_liblinear(self): def test_score_multiclass_liblinear(self):
"""Test score for multiclass classification with liblinear kernel."""
X, y = load_dataset( X, y = load_dataset(
random_state=self._random_state, random_state=self._random_state,
n_classes=3, n_classes=3,
@@ -509,6 +547,7 @@ class Stree_test(unittest.TestCase):
self.assertEqual(1.0, clf2.fit(X, y).score(X, y)) self.assertEqual(1.0, clf2.fit(X, y).score(X, y))
def test_score_multiclass_sigmoid(self): def test_score_multiclass_sigmoid(self):
"""Test score for multiclass classification with sigmoid kernel."""
X, y = load_dataset( X, y = load_dataset(
random_state=self._random_state, random_state=self._random_state,
n_classes=3, n_classes=3,
@@ -529,6 +568,7 @@ class Stree_test(unittest.TestCase):
self.assertEqual(0.9662921348314607, clf2.fit(X, y).score(X, y)) self.assertEqual(0.9662921348314607, clf2.fit(X, y).score(X, y))
def test_score_multiclass_linear(self): def test_score_multiclass_linear(self):
"""Test score for multiclass classification with linear kernel."""
warnings.filterwarnings("ignore", category=ConvergenceWarning) warnings.filterwarnings("ignore", category=ConvergenceWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning) warnings.filterwarnings("ignore", category=RuntimeWarning)
X, y = load_dataset( X, y = load_dataset(
@@ -556,11 +596,13 @@ class Stree_test(unittest.TestCase):
self.assertEqual(1.0, clf2.fit(X, y).score(X, y)) self.assertEqual(1.0, clf2.fit(X, y).score(X, y))
def test_zero_all_sample_weights(self): def test_zero_all_sample_weights(self):
"""Test exception raises when all sample weights are zero."""
X, y = load_dataset(self._random_state) X, y = load_dataset(self._random_state)
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
Stree().fit(X, y, np.zeros(len(y))) Stree().fit(X, y, np.zeros(len(y)))
def test_mask_samples_weighted_zero(self): def test_mask_samples_weighted_zero(self):
"""Check that the weighted zero samples are masked."""
X = np.array( X = np.array(
[ [
[1, 1], [1, 1],
@@ -588,6 +630,7 @@ class Stree_test(unittest.TestCase):
self.assertEqual(model2.score(X, y, w), 1) self.assertEqual(model2.score(X, y, w), 1)
def test_depth(self): def test_depth(self):
"""Check depth of the tree."""
X, y = load_dataset( X, y = load_dataset(
random_state=self._random_state, random_state=self._random_state,
n_classes=3, n_classes=3,
@@ -603,6 +646,7 @@ class Stree_test(unittest.TestCase):
self.assertEqual(4, clf.depth_) self.assertEqual(4, clf.depth_)
def test_nodes_leaves(self): def test_nodes_leaves(self):
"""Check number of nodes and leaves."""
X, y = load_dataset( X, y = load_dataset(
random_state=self._random_state, random_state=self._random_state,
n_classes=3, n_classes=3,
@@ -622,6 +666,7 @@ class Stree_test(unittest.TestCase):
self.assertEqual(6, leaves) self.assertEqual(6, leaves)
def test_nodes_leaves_artificial(self): def test_nodes_leaves_artificial(self):
"""Check leaves of artificial dataset."""
n1 = Snode(None, [1, 2, 3, 4], [1, 0, 1, 1], [], 0.0, "test1") n1 = Snode(None, [1, 2, 3, 4], [1, 0, 1, 1], [], 0.0, "test1")
n2 = Snode(None, [1, 2, 3, 4], [1, 0, 1, 1], [], 0.0, "test2") n2 = Snode(None, [1, 2, 3, 4], [1, 0, 1, 1], [], 0.0, "test2")
n3 = Snode(None, [1, 2, 3, 4], [1, 0, 1, 1], [], 0.0, "test3") n3 = Snode(None, [1, 2, 3, 4], [1, 0, 1, 1], [], 0.0, "test3")
@@ -640,12 +685,14 @@ class Stree_test(unittest.TestCase):
self.assertEqual(2, leaves) self.assertEqual(2, leaves)
def test_bogus_multiclass_strategy(self): def test_bogus_multiclass_strategy(self):
"""Check invalid multiclass strategy."""
clf = Stree(multiclass_strategy="other") clf = Stree(multiclass_strategy="other")
X, y = load_wine(return_X_y=True) X, y = load_wine(return_X_y=True)
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
clf.fit(X, y) clf.fit(X, y)
def test_multiclass_strategy(self): def test_multiclass_strategy(self):
"""Check multiclass strategy."""
X, y = load_wine(return_X_y=True) X, y = load_wine(return_X_y=True)
clf_o = Stree(multiclass_strategy="ovo") clf_o = Stree(multiclass_strategy="ovo")
clf_r = Stree(multiclass_strategy="ovr") clf_r = Stree(multiclass_strategy="ovr")
@@ -655,6 +702,7 @@ class Stree_test(unittest.TestCase):
self.assertEqual(0.9269662921348315, score_r) self.assertEqual(0.9269662921348315, score_r)
def test_incompatible_hyperparameters(self): def test_incompatible_hyperparameters(self):
"""Check incompatible hyperparameters."""
X, y = load_wine(return_X_y=True) X, y = load_wine(return_X_y=True)
clf = Stree(kernel="liblinear", multiclass_strategy="ovo") clf = Stree(kernel="liblinear", multiclass_strategy="ovo")
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
@@ -664,5 +712,48 @@ class Stree_test(unittest.TestCase):
clf.fit(X, y) clf.fit(X, y)
def test_version(self): def test_version(self):
"""Check STree version."""
clf = Stree() clf = Stree()
self.assertEqual(__version__, clf.version()) self.assertEqual(__version__, clf.version())
def test_graph(self):
"""Check graphviz representation of the tree."""
X, y = load_wine(return_X_y=True)
clf = Stree(random_state=self._random_state)
expected_head = (
"digraph STree {\nlabel=<STree >\nfontsize=30\n"
"fontcolor=blue\nlabelloc=t\n"
)
expected_tail = (
' [shape=box style=filled label="class=1 impurity=0.000 '
'counts=[0 1 0]"];\n}\n'
)
self.assertEqual(clf.graph(), expected_head + "}\n")
clf.fit(X, y)
computed = clf.graph()
computed_head = computed[: len(expected_head)]
num = -len(expected_tail)
computed_tail = computed[num:]
self.assertEqual(computed_head, expected_head)
self.assertEqual(computed_tail, expected_tail)
def test_graph_title(self):
X, y = load_wine(return_X_y=True)
clf = Stree(random_state=self._random_state)
expected_head = (
"digraph STree {\nlabel=<STree Sample title>\nfontsize=30\n"
"fontcolor=blue\nlabelloc=t\n"
)
expected_tail = (
' [shape=box style=filled label="class=1 impurity=0.000 '
'counts=[0 1 0]"];\n}\n'
)
self.assertEqual(clf.graph("Sample title"), expected_head + "}\n")
clf.fit(X, y)
computed = clf.graph("Sample title")
computed_head = computed[: len(expected_head)]
num = -len(expected_tail)
computed_tail = computed[num:]
self.assertEqual(computed_head, expected_head)
self.assertEqual(computed_tail, expected_tail)