Compare commits

...

10 Commits

Author SHA1 Message Date
d1e30a3372 Refactor predict and score and make mypy --strict 2020-07-01 18:37:10 +02:00
fa001f97a4 First Approach 2020-06-28 02:46:20 +02:00
be552fdd6c Add test for getting 3 feature_sets in Splitter
Add ensemble notebook
2020-06-28 02:45:08 +02:00
5e3a8e3ec5 Change adaboost notebook 2020-06-27 23:34:15 +02:00
554ec03c32 Get only 3 sets for best split
Fix flaky test in Splitter_test
2020-06-27 18:29:40 +02:00
4b7e4a3fb0 better solution to the sklearn bagging problem
Add better tests
enhance .coveragerc
2020-06-26 11:22:45 +02:00
76723993fd Solve Warning class label not found when bagging 2020-06-25 13:07:50 +02:00
ecd0b86f4d Solve the mistake of min and max distance
The split criteria functions min and max distance return classes while
max_samples return distances positives and negatives to hyperplane of
the class with more samples in node
2020-06-17 00:13:52 +02:00
3e52a4746c Fix entroy and information_gain functions 2020-06-16 13:56:02 +02:00
Ricardo Montañana Gómez
a20e45e8e7 Merge pull request #10 from Doctorado-ML/add_subspaces
#2 Add subspaces
2020-06-15 11:30:53 +02:00
10 changed files with 665 additions and 321 deletions

View File

@@ -10,5 +10,4 @@ exclude_lines =
if __name__ == .__main__.:
ignore_errors = True
omit =
stree/tests/*
stree/__init__.py

4
.gitignore vendored
View File

@@ -130,4 +130,6 @@ dmypy.json
.idea
.vscode
.pre-commit-config.yaml
.pre-commit-config.yaml
**.csv

View File

@@ -4,7 +4,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"# Test AdaBoost with different configurations"
"# Test Stree with AdaBoost and Bagging with different configurations"
]
},
{
@@ -34,11 +34,8 @@
"outputs": [],
"source": [
"import time\n",
"from sklearn.ensemble import AdaBoostClassifier\n",
"from sklearn.tree import DecisionTreeClassifier\n",
"from sklearn.svm import LinearSVC, SVC\n",
"from sklearn.model_selection import GridSearchCV, train_test_split\n",
"from sklearn.datasets import load_iris\n",
"from sklearn.ensemble import AdaBoostClassifier, BaggingClassifier\n",
"from sklearn.model_selection import train_test_split\n",
"from stree import Stree"
]
},
@@ -57,12 +54,14 @@
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"metadata": {
"tags": []
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": "Fraud: 0.173% 492\nValid: 99.827% 284315\nX.shape (100492, 28) y.shape (100492,)\nFraud: 0.659% 662\nValid: 99.341% 99830\n"
"text": "Fraud: 0.173% 492\nValid: 99.827% 284315\nX.shape (100492, 28) y.shape (100492,)\nFraud: 0.644% 647\nValid: 99.356% 99845\n"
}
],
"source": [
@@ -117,18 +116,20 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"## STree alone on the whole dataset and linear kernel"
"## STree alone with 100.000 samples and linear kernel"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"metadata": {
"tags": []
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": "Score Train: 0.9985499829409757\nScore Test: 0.998407854584052\nTook 39.45 seconds\n"
"text": "Score Train: 0.9985784146480154\nScore Test: 0.9981093273185617\nTook 73.27 seconds\n"
}
],
"source": [
@@ -144,7 +145,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"## Different kernels with different configuations"
"## Adaboost"
]
},
{
@@ -161,18 +162,20 @@
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"metadata": {
"tags": []
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": "Kernel: linear\tTime: 87.00 seconds\tScore Train: 0.9982372\tScore Test: 0.9981425\nKernel: rbf\tTime: 60.60 seconds\tScore Train: 0.9934181\tScore Test: 0.9933992\nKernel: poly\tTime: 88.08 seconds\tScore Train: 0.9937450\tScore Test: 0.9938968\n"
"text": "Kernel: linear\tTime: 93.78 seconds\tScore Train: 0.9983083\tScore Test: 0.9983083\nKernel: rbf\tTime: 18.32 seconds\tScore Train: 0.9935602\tScore Test: 0.9935651\nKernel: poly\tTime: 69.68 seconds\tScore Train: 0.9973132\tScore Test: 0.9972801\n"
}
],
"source": [
"for kernel in ['linear', 'rbf', 'poly']:\n",
" now = time.time()\n",
" clf = AdaBoostClassifier(Stree(C=7, kernel=kernel, max_depth=max_depth, random_state=random_state), n_estimators=n_estimators, random_state=random_state)\n",
" clf = AdaBoostClassifier(base_estimator=Stree(C=C, kernel=kernel, max_depth=max_depth, random_state=random_state), algorithm=\"SAMME\", n_estimators=n_estimators, random_state=random_state)\n",
" clf.fit(Xtrain, ytrain)\n",
" score_train = clf.score(Xtrain, ytrain)\n",
" score_test = clf.score(Xtest, ytest)\n",
@@ -183,24 +186,37 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"## Test algorithm SAMME in AdaBoost to check speed/accuracy"
"## Bagging"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"n_estimators = 10\n",
"C = 7\n",
"max_depth = 3"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"tags": []
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": "Kernel: linear\tTime: 58.75 seconds\tScore Train: 0.9980524\tScore Test: 0.9978771\nKernel: rbf\tTime: 12.49 seconds\tScore Train: 0.9934181\tScore Test: 0.9933992\nKernel: poly\tTime: 97.85 seconds\tScore Train: 0.9972137\tScore Test: 0.9971806\n"
"text": "Kernel: linear\tTime: 387.06 seconds\tScore Train: 0.9985784\tScore Test: 0.9981093\nKernel: rbf\tTime: 144.00 seconds\tScore Train: 0.9992750\tScore Test: 0.9983415\nKernel: poly\tTime: 101.78 seconds\tScore Train: 0.9992466\tScore Test: 0.9981757\n"
}
],
"source": [
"for kernel in ['linear', 'rbf', 'poly']:\n",
" now = time.time()\n",
" clf = AdaBoostClassifier(Stree(C=7, kernel=kernel, max_depth=max_depth, random_state=random_state), n_estimators=n_estimators, random_state=random_state, algorithm=\"SAMME\")\n",
" clf = BaggingClassifier(base_estimator=Stree(C=C, kernel=kernel, max_depth=max_depth, random_state=random_state), n_estimators=n_estimators, random_state=random_state)\n",
" clf.fit(Xtrain, ytrain)\n",
" score_train = clf.score(Xtrain, ytrain)\n",
" score_test = clf.score(Xtest, ytest)\n",
@@ -223,7 +239,7 @@
},
"orig_nbformat": 2,
"kernelspec": {
"name": "python37664bitgeneralvenvfbd0a23e74cf4e778460f5ffc6761f39",
"name": "python37664bitgeneralvenve3128601eb614c5da59c5055670b6040",
"display_name": "Python 3.7.6 64-bit ('general': venv)"
}
},

File diff suppressed because one or more lines are too long

View File

@@ -6,24 +6,24 @@ __version__ = "0.9"
Build an oblique tree classifier based on SVM Trees
"""
from __future__ import annotations
import os
import numbers
import random
import warnings
from typing import Optional, List, Union, Tuple
from math import log
from itertools import combinations
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.svm import SVC, LinearSVC
from sklearn.utils import check_consistent_length
from sklearn.utils.multiclass import check_classification_targets
from sklearn.exceptions import ConvergenceWarning
from sklearn.utils.validation import (
check_X_y,
check_array,
import numpy as np # type: ignore
from sklearn.base import BaseEstimator, ClassifierMixin # type: ignore
from sklearn.svm import SVC, LinearSVC # type: ignore
from sklearn.utils.multiclass import ( # type: ignore
check_classification_targets,
)
from sklearn.exceptions import ConvergenceWarning # type: ignore
from sklearn.utils.validation import ( # type: ignore
check_is_fitted,
_check_sample_weight,
)
from sklearn.metrics._classification import _weighted_sum, _check_targets
class Snode:
@@ -33,29 +33,33 @@ class Snode:
def __init__(
self,
clf: SVC,
clf: Union[SVC, LinearSVC],
X: np.ndarray,
y: np.ndarray,
features: np.array,
impurity: float,
title: str,
weight: np.ndarray = None,
):
self._clf = clf
self._title = title
self._belief = 0.0
self._clf: Union[SVC, LinearSVC] = clf
self._title: str = title
self._belief: float = 0.0
# Only store dataset in Testing
self._X = X if os.environ.get("TESTING", "NS") != "NS" else None
self._y = y
self._down = None
self._up = None
self._X: Optional[np.array] = X if os.environ.get(
"TESTING", "NS"
) != "NS" else None
self._y: np.array = y
self._down: Optional[Snode] = None
self._up: Optional[Snode] = None
self._class = None
self._feature = None
self._sample_weight = None
self._features = features
self._impurity = impurity
self._sample_weight: Optional[np.array] = (
weight if os.environ.get("TESTING", "NS") != "NS" else None
)
self._features: Tuple[int, ...] = features
self._impurity: float = impurity
@classmethod
def copy(cls, node: "Snode") -> "Snode":
def copy(cls, node: Snode) -> Snode:
return cls(
node._clf,
node._X,
@@ -65,22 +69,22 @@ class Snode:
node._title,
)
def set_down(self, son):
def set_down(self, son: Snode) -> None:
self._down = son
def set_up(self, son):
def set_up(self, son: Snode) -> None:
self._up = son
def is_leaf(self) -> bool:
return self._up is None and self._down is None
def get_down(self) -> "Snode":
def get_down(self) -> Optional[Snode]:
return self._down
def get_up(self) -> "Snode":
def get_up(self) -> Optional[Snode]:
return self._up
def make_predictor(self):
def make_predictor(self) -> None:
"""Compute the class of the predictor and its belief based on the
subdataset of the node only if it is a leaf
"""
@@ -119,11 +123,11 @@ class Siterator:
"""Stree preorder iterator
"""
def __init__(self, tree: Snode):
self._stack = []
def __init__(self, tree: Optional[Snode]):
self._stack: List[Snode] = []
self._push(tree)
def _push(self, node: Snode):
def _push(self, node: Optional[Snode]) -> None:
if node is not None:
self._stack.append(node)
@@ -139,21 +143,21 @@ class Siterator:
class Splitter:
def __init__(
self,
clf: SVC = None,
criterion: str = None,
splitter_type: str = None,
criteria: str = None,
min_samples_split: int = None,
random_state=None,
clf: Union[SVC, LinearSVC] = None,
criterion: str = "",
splitter_type: str = "",
criteria: str = "",
min_samples_split: int = 0,
random_state: Optional[int] = None,
):
self._clf = clf
self._random_state = random_state
self._clf: Union[SVC, LinearSVC] = clf
self._random_state: Optional[int] = random_state
if random_state is not None:
random.seed(random_state)
self._criterion = criterion
self._min_samples_split = min_samples_split
self._criteria = criteria
self._splitter_type = splitter_type
self._criterion: str = criterion
self._min_samples_split: int = min_samples_split
self._criteria: str = criteria
self._splitter_type: str = splitter_type
if clf is None:
raise ValueError(f"clf has to be a sklearn estimator, got({clf})")
@@ -163,10 +167,10 @@ class Splitter:
f"criterion must be gini or entropy got({criterion})"
)
if criteria not in ["min_distance", "max_samples"]:
if criteria not in ["min_distance", "max_samples", "max_distance"]:
raise ValueError(
f"split_criteria has to be min_distance or \
max_samples got ({criteria})"
"split_criteria has to be min_distance "
f"max_distance or max_samples got ({criteria})"
)
if splitter_type not in ["random", "best"]:
@@ -182,29 +186,55 @@ class Splitter:
@staticmethod
def _gini(y: np.array) -> float:
_, count = np.unique(y, return_counts=True)
return 1 - np.sum(np.square(count / np.sum(count)))
return float(1 - np.sum(np.square(count / np.sum(count))))
@staticmethod
def _entropy(y: np.array) -> float:
_, count = np.unique(y, return_counts=True)
proportion = count / np.sum(count)
return -np.sum(proportion * np.log2(proportion))
n_labels = len(y)
if n_labels <= 1:
return 0
counts = np.bincount(y)
proportions = counts / n_labels
n_classes = np.count_nonzero(proportions)
if n_classes <= 1:
return 0
entropy = 0.0
# Compute standard entropy.
for prop in proportions:
if prop != 0.0:
entropy -= prop * log(prop, n_classes)
return entropy
def information_gain(
self, labels_up: np.array, labels_dn: np.array
self, labels: np.array, labels_up: np.array, labels_dn: np.array
) -> float:
card_up = labels_up.shape[0] if labels_up is not None else 0
card_dn = labels_dn.shape[0] if labels_dn is not None else 0
imp_prev = self.criterion_function(labels)
card_up = card_dn = imp_up = imp_dn = 0
if labels_up is not None:
card_up = labels_up.shape[0]
imp_up = self.criterion_function(labels_up)
if labels_dn is not None:
card_dn = labels_dn.shape[0] if labels_dn is not None else 0
imp_dn = self.criterion_function(labels_dn)
samples = card_up + card_dn
up = card_up / samples * self.criterion_function(labels_up)
dn = card_dn / samples * self.criterion_function(labels_dn)
return up + dn
if samples == 0:
return 0.0
else:
result = float(
imp_prev
- (card_up / samples) * imp_up
- (card_dn / samples) * imp_dn
)
return result
def _select_best_set(
self, dataset: np.array, labels: np.array, features_sets: list
) -> list:
min_impurity = 1
selected = None
self,
dataset: np.array,
labels: np.array,
features_sets: List[Tuple[int, ...]],
) -> Tuple[int, ...]:
max_gain: float = 0.0
selected: Union[Tuple[int, ...], None] = None
warnings.filterwarnings("ignore", category=ConvergenceWarning)
for feature_set in features_sets:
self._clf.fit(dataset[:, feature_set], labels)
@@ -213,11 +243,11 @@ class Splitter:
)
self.partition(dataset, node)
y1, y2 = self.part(labels)
impurity = self.information_gain(y1, y2)
if impurity < min_impurity:
min_impurity = impurity
gain = self.information_gain(labels, y1, y2)
if gain > max_gain:
max_gain = gain
selected = feature_set
return selected
return selected if selected is not None else feature_set
def _get_subspaces_set(
self, dataset: np.array, labels: np.array, max_features: int
@@ -226,37 +256,76 @@ class Splitter:
features_sets = list(combinations(features, max_features))
if len(features_sets) > 1:
if self._splitter_type == "random":
return features_sets[random.randint(0, len(features_sets) - 1)]
index = random.randint(0, len(features_sets) - 1)
return features_sets[index]
else:
# get only 3 sets at most
if len(features_sets) > 3:
features_sets = random.sample(features_sets, 3)
return self._select_best_set(dataset, labels, features_sets)
else:
return features_sets[0]
def get_subspace(
self, dataset: np.array, labels: np.array, max_features: int
) -> list:
) -> Tuple[np.array, np.array]:
"""Return the best subspace to make a split
"""
indices = self._get_subspaces_set(dataset, labels, max_features)
return dataset[:, indices], indices
@staticmethod
def _min_distance(data: np.array, _) -> np.array:
# chooses the lowest distance of every sample
indices = np.argmin(np.abs(data), axis=1)
return np.array(
[data[x, y] for x, y in zip(range(len(data[:, 0])), indices)]
)
def _min_distance(data: np.array, _: np.array) -> np.array:
"""Assign class to min distances
return a vector of classes so partition can separate class 0 from
the rest of classes, ie. class 0 goes to one splitted node and the
rest of classes go to the other
:param data: distances to hyper plane of every class
:type data: np.array (m, n_classes)
:param _: enable call compat with other measures
:type _: None
:return: vector with the class assigned to each sample
:rtype: np.array shape (m,)
"""
return np.argmin(data, axis=1)
@staticmethod
def _max_distance(data: np.array, _: np.array) -> np.array:
"""Assign class to max distances
return a vector of classes so partition can separate class 0 from
the rest of classes, ie. class 0 goes to one splitted node and the
rest of classes go to the other
:param data: distances to hyper plane of every class
:type data: np.array (m, n_classes)
:param _: enable call compat with other measures
:type _: None
:return: vector with the class assigned to each sample values
(can be 0, 1, ...)
:rtype: np.array shape (m,)
"""
return np.argmax(data, axis=1)
@staticmethod
def _max_samples(data: np.array, y: np.array) -> np.array:
"""return distances of the class with more samples
:param data: distances to hyper plane of every class
:type data: np.array (m, n_classes)
:param y: vector of labels (classes)
:type y: np.array (m,)
:return: vector with distances to hyperplane (can be positive or neg.)
:rtype: np.array shape (m,)
"""
# select the class with max number of samples
_, samples = np.unique(y, return_counts=True)
selected = np.argmax(samples)
return data[:, selected]
def partition(self, samples: np.array, node: Snode):
"""Set the criteria to split arrays
def partition(self, samples: np.array, node: Snode) -> None:
"""Set the criteria to split arrays. Compute the indices of the samples
that should go to one side of the tree (down)
"""
data = self._distances(node, samples)
@@ -282,7 +351,7 @@ class Splitter:
"""
return node._clf.decision_function(data[:, node._features])
def part(self, origin: np.array) -> list:
def part(self, origin: np.array) -> Tuple[np.array, np.array]:
"""Split an array in two based on indices (down) and its complement
:param origin: dataset to split
@@ -293,13 +362,13 @@ class Splitter:
:rtype: list
"""
up = ~self._down
return [
return (
origin[up] if any(up) else None,
origin[self._down] if any(self._down) else None,
]
)
class Stree(BaseEstimator, ClassifierMixin):
class Stree(BaseEstimator, ClassifierMixin): # type: ignore
"""Estimator that is based on binary trees of svm nodes
can deal with sample_weights in predict, used in boosting sklearn methods
inheriting from BaseEstimator implements get_params and set_params methods
@@ -312,42 +381,34 @@ class Stree(BaseEstimator, ClassifierMixin):
C: float = 1.0,
kernel: str = "linear",
max_iter: int = 1000,
random_state: int = None,
max_depth: int = None,
random_state: Optional[int] = None,
max_depth: Optional[int] = None,
tol: float = 1e-4,
degree: int = 3,
gamma="scale",
gamma: Union[float, str] = "scale",
split_criteria: str = "max_samples",
criterion: str = "gini",
min_samples_split: int = 0,
max_features=None,
max_features: Optional[Union[str, int, float]] = None,
splitter: str = "random",
):
self.max_iter = max_iter
self.C = C
self.kernel = kernel
self.random_state = random_state
self.max_depth = max_depth
self.tol = tol
self.gamma = gamma
self.degree = degree
self.min_samples_split = min_samples_split
self.split_criteria = split_criteria
self.max_features = max_features
self.criterion = criterion
self.splitter = splitter
def _more_tags(self) -> dict:
"""Required by sklearn to supply features of the classifier
:return: the tag required
:rtype: dict
"""
return {"requires_y": True}
self.C: float = C
self.kernel: str = kernel
self.random_state: Optional[int] = random_state
self.max_depth: Optional[int] = max_depth
self.tol: float = tol
self.gamma: Union[float, str] = gamma
self.degree: int = degree
self.min_samples_split: int = min_samples_split
self.split_criteria: str = split_criteria
self.max_features: Union[str, int, float, None] = max_features
self.criterion: str = criterion
self.splitter: str = splitter
def fit(
self, X: np.ndarray, y: np.ndarray, sample_weight: np.array = None
) -> "Stree":
) -> Stree:
"""Build the tree based on the dataset of samples and its labels
:param X: dataset of samples to make predictions
@@ -376,11 +437,11 @@ class Stree(BaseEstimator, ClassifierMixin):
f"Maximum depth has to be greater than 1... got (max_depth=\
{self.max_depth})"
)
check_classification_targets(y)
X, y = check_X_y(X, y)
sample_weight = _check_sample_weight(sample_weight, X)
check_classification_targets(y)
X, y = self._validate_data(X, y)
sample_weight = _check_sample_weight(
sample_weight, X, dtype=np.float64
)
# Initialize computed parameters
self.splitter_ = Splitter(
clf=self._build_clf(),
@@ -396,8 +457,6 @@ class Stree(BaseEstimator, ClassifierMixin):
self.n_classes_ = self.classes_.shape[0]
self.n_iter_ = self.max_iter
self.depth_ = 0
self.n_features_ = X.shape[1]
self.n_features_in_ = X.shape[1]
self.max_features_ = self._initialize_max_features()
self.tree_ = self.train(X, y, sample_weight, 1, "root")
self._build_predictor()
@@ -410,7 +469,7 @@ class Stree(BaseEstimator, ClassifierMixin):
sample_weight: np.ndarray,
depth: int,
title: str,
) -> Snode:
) -> Optional[Snode]:
"""Recursive function to split the original dataset into predictor
nodes (leaves)
@@ -439,13 +498,22 @@ class Stree(BaseEstimator, ClassifierMixin):
features=X.shape[1],
impurity=0.0,
title=title + ", <pure>",
weight=sample_weight,
)
# Train the model
clf = self._build_clf()
Xs, features = self.splitter_.get_subspace(X, y, self.max_features_)
# solve WARNING: class label 0 specified in weight is not found
# in bagging
if any(sample_weight == 0):
indices = sample_weight == 0
y_next = y[~indices]
# touch weights if removing any class
if np.unique(y_next).shape[0] != self.n_classes_:
sample_weight += 1e-5
clf.fit(Xs, y, sample_weight=sample_weight)
impurity = self.splitter_.impurity(y)
node = Snode(clf, X, y, features, impurity, title)
node = Snode(clf, X, y, features, impurity, title, sample_weight)
self.depth_ = max(depth, self.depth_)
self.splitter_.partition(X, node)
X_U, X_D = self.splitter_.part(X)
@@ -460,16 +528,27 @@ class Stree(BaseEstimator, ClassifierMixin):
features=X.shape[1],
impurity=impurity,
title=title + ", <cgaf>",
weight=sample_weight,
)
node.set_up(self.train(X_U, y_u, sw_u, depth + 1, title + " - Up"))
node.set_down(self.train(X_D, y_d, sw_d, depth + 1, title + " - Down"))
node.set_up(
self.train( # type: ignore
X_U, y_u, sw_u, depth + 1, title + " - Up"
)
)
node.set_down(
self.train( # type: ignore
X_D, y_d, sw_d, depth + 1, title + " - Down"
)
)
return node
def _build_predictor(self):
def _build_predictor(self) -> None:
"""Process the leaves to make them predictors
"""
def run_tree(node: Snode):
def run_tree(node: Optional[Snode]) -> None:
if node is None:
raise ValueError("Can't build predictors on None")
if node.is_leaf():
node.make_predictor()
return
@@ -478,7 +557,7 @@ class Stree(BaseEstimator, ClassifierMixin):
run_tree(self.tree_)
def _build_clf(self):
def _build_clf(self) -> Union[LinearSVC, SVC]:
""" Build the correct classifier for the node
"""
return (
@@ -527,30 +606,30 @@ class Stree(BaseEstimator, ClassifierMixin):
"""
def predict_class(
xp: np.array, indices: np.array, node: Snode
xp: np.array, indices: np.array, node: Optional[Snode]
) -> np.array:
if xp is None:
return [], []
if node.is_leaf():
if node.is_leaf(): # type: ignore
# set a class for every sample in dataset
prediction = np.full((xp.shape[0], 1), node._class)
prediction = np.full(
(xp.shape[0], 1), node._class # type: ignore
)
return prediction, indices
self.splitter_.partition(xp, node)
self.splitter_.partition(xp, node) # type: ignore
x_u, x_d = self.splitter_.part(xp)
i_u, i_d = self.splitter_.part(indices)
prx_u, prin_u = predict_class(x_u, i_u, node.get_up())
prx_d, prin_d = predict_class(x_d, i_d, node.get_down())
prx_u, prin_u = predict_class(
x_u, i_u, node.get_up() # type: ignore
)
prx_d, prin_d = predict_class(
x_d, i_d, node.get_down() # type: ignore
)
return np.append(prx_u, prx_d), np.append(prin_u, prin_d)
# sklearn check
check_is_fitted(self, ["tree_"])
check_is_fitted(self, "n_features_in_")
# Input validation
X = check_array(X)
if X.shape[1] != self.n_features_:
raise ValueError(
f"Expected {self.n_features_} features but got "
f"({X.shape[1]})"
)
X = self._validate_data(X, reset=False)
# setup prediction & make it happen
indices = np.arange(X.shape[0])
result = (
@@ -560,32 +639,6 @@ class Stree(BaseEstimator, ClassifierMixin):
)
return self.classes_[result]
def score(
self, X: np.array, y: np.array, sample_weight: np.array = None
) -> float:
"""Compute accuracy of the prediction
:param X: dataset of samples to make predictions
:type X: np.array
:param y_true: samples labels
:type y_true: np.array
:param sample_weight: weights of the samples. Rescale C per sample.
Hi' weights force the classifier to put more emphasis on these points
:type sample_weight: np.array optional
:return: accuracy of the prediction
:rtype: float
"""
# sklearn check
check_is_fitted(self)
check_classification_targets(y)
X, y = check_X_y(X, y)
y_pred = self.predict(X).reshape(y.shape)
# Compute accuracy for each possible representation
_, y_true, y_pred = _check_targets(y, y_pred)
check_consistent_length(y_true, y_pred, sample_weight)
score = y_true == y_pred
return _weighted_sum(score, sample_weight, normalize=True)
def __iter__(self) -> Siterator:
"""Create an iterator to be able to visit the nodes of the tree in
preorder, can make a list with all the nodes in preorder
@@ -613,11 +666,11 @@ class Stree(BaseEstimator, ClassifierMixin):
def _initialize_max_features(self) -> int:
if isinstance(self.max_features, str):
if self.max_features == "auto":
max_features = max(1, int(np.sqrt(self.n_features_)))
max_features = max(1, int(np.sqrt(self.n_features_in_)))
elif self.max_features == "sqrt":
max_features = max(1, int(np.sqrt(self.n_features_)))
max_features = max(1, int(np.sqrt(self.n_features_in_)))
elif self.max_features == "log2":
max_features = max(1, int(np.log2(self.n_features_)))
max_features = max(1, int(np.log2(self.n_features_in_)))
else:
raise ValueError(
"Invalid value for max_features. "
@@ -625,13 +678,13 @@ class Stree(BaseEstimator, ClassifierMixin):
"'sqrt' or 'log2'."
)
elif self.max_features is None:
max_features = self.n_features_
elif isinstance(self.max_features, numbers.Integral):
max_features = self.n_features_in_
elif isinstance(self.max_features, int):
max_features = self.max_features
else: # float
if self.max_features > 0.0:
max_features = max(
1, int(self.max_features * self.n_features_)
1, int(self.max_features * self.n_features_in_)
)
else:
raise ValueError(

View File

@@ -1,3 +1,4 @@
# type: ignore
import os
import unittest
@@ -33,10 +34,7 @@ class Snode_test(unittest.TestCase):
max_card = max(card)
min_card = min(card)
if len(classes) > 1:
try:
belief = max_card / (max_card + min_card)
except ZeroDivisionError:
belief = 0.0
belief = max_card / (max_card + min_card)
else:
belief = 1
self.assertEqual(belief, node._belief)

View File

@@ -1,11 +1,12 @@
# type: ignore
import os
import unittest
import random
import numpy as np
from sklearn.svm import LinearSVC
from sklearn.svm import SVC
from sklearn.datasets import load_wine, load_iris
from stree import Splitter
from .utils import load_dataset
class Splitter_test(unittest.TestCase):
@@ -15,7 +16,7 @@ class Splitter_test(unittest.TestCase):
@staticmethod
def build(
clf=LinearSVC(),
clf=SVC,
min_samples_split=0,
splitter_type="random",
criterion="gini",
@@ -23,7 +24,7 @@ class Splitter_test(unittest.TestCase):
random_state=None,
):
return Splitter(
clf=clf,
clf=clf(random_state=random_state, kernel="rbf"),
min_samples_split=min_samples_split,
splitter_type=splitter_type,
criterion=criterion,
@@ -43,10 +44,14 @@ class Splitter_test(unittest.TestCase):
with self.assertRaises(ValueError):
self.build(criteria="duck")
with self.assertRaises(ValueError):
self.build(clf=None)
_ = Splitter(clf=None)
for splitter_type in ["best", "random"]:
for criterion in ["gini", "entropy"]:
for criteria in ["min_distance", "max_samples"]:
for criteria in [
"min_distance",
"max_samples",
"max_distance",
]:
tcl = self.build(
splitter_type=splitter_type,
criterion=criterion,
@@ -57,30 +62,74 @@ class Splitter_test(unittest.TestCase):
self.assertEqual(criteria, tcl._criteria)
def test_gini(self):
y = [0, 1, 1, 1, 1, 1, 0, 0, 0, 1]
expected = 0.48
self.assertEqual(expected, Splitter._gini(y))
tcl = self.build(criterion="gini")
self.assertEqual(expected, tcl.criterion_function(y))
expected_values = [
([0, 1, 1, 1, 1, 1, 0, 0, 0, 1], 0.48),
([0, 1, 1, 2, 2, 3, 4, 5, 3, 2, 1, 1], 0.7777777777777778),
([0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 2, 2, 2], 0.520408163265306),
([0, 0, 1, 1, 1, 1, 0, 0], 0.5),
([0, 0, 1, 1, 2, 2, 3, 3], 0.75),
([0, 0, 1, 1, 1, 1, 1, 1], 0.375),
([0], 0),
([1, 1, 1, 1], 0),
]
for labels, expected in expected_values:
self.assertAlmostEqual(expected, Splitter._gini(labels))
tcl = self.build(criterion="gini")
self.assertAlmostEqual(expected, tcl.criterion_function(labels))
def test_entropy(self):
y = [0, 1, 1, 1, 1, 1, 0, 0, 0, 1]
expected = 0.9709505944546686
self.assertAlmostEqual(expected, Splitter._entropy(y))
tcl = self.build(criterion="entropy")
self.assertEqual(expected, tcl.criterion_function(y))
expected_values = [
([0, 1, 1, 1, 1, 1, 0, 0, 0, 1], 0.9709505944546686),
([0, 1, 1, 2, 2, 3, 4, 5, 3, 2, 1, 1], 0.9111886696810589),
([0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 2, 2, 2], 0.8120406807940999),
([0, 0, 1, 1, 1, 1, 0, 0], 1),
([0, 0, 1, 1, 2, 2, 3, 3], 1),
([0, 0, 1, 1, 1, 1, 1, 1], 0.8112781244591328),
([1], 0),
([0, 0, 0, 0], 0),
]
for labels, expected in expected_values:
self.assertAlmostEqual(expected, Splitter._entropy(labels))
tcl = self.build(criterion="entropy")
self.assertAlmostEqual(expected, tcl.criterion_function(labels))
def test_information_gain(self):
yu = np.array([0, 1, 1, 1, 1, 1])
yd = np.array([0, 0, 0, 1])
values_expected = [
("gini", 0.31666666666666665),
("entropy", 0.7145247027726656),
expected_values = [
(
[0, 1, 1, 1, 1, 1],
[0, 0, 0, 1],
0.16333333333333333,
0.25642589168200297,
),
(
[0, 1, 1, 2, 2, 3, 4, 5, 3, 2, 1, 1],
[5, 3, 2, 1, 1],
0.007381776239907684,
-0.03328610916207225,
),
([], [], 0.0, 0.0),
([1], [], 0.0, 0.0),
([], [1], 0.0, 0.0),
([0, 0, 0, 0], [0, 0], 0.0, 0.0),
([], [1, 1, 1, 2], 0.0, 0.0),
(None, [1, 2, 3], 0.0, 0.0),
([1, 2, 3], None, 0.0, 0.0),
]
for criterion, expected in values_expected:
tcl = self.build(criterion=criterion)
computed = tcl.information_gain(yu, yd)
self.assertAlmostEqual(expected, computed)
for yu, yd, expected_gini, expected_entropy in expected_values:
yu = np.array(yu, dtype=np.int32) if yu is not None else None
yd = np.array(yd, dtype=np.int32) if yd is not None else None
if yu is not None and yd is not None:
complete = np.append(yu, yd)
elif yd is not None:
complete = yd
else:
complete = yu
tcl = self.build(criterion="gini")
computed = tcl.information_gain(complete, yu, yd)
self.assertAlmostEqual(expected_gini, computed)
tcl = self.build(criterion="entropy")
computed = tcl.information_gain(complete, yu, yd)
self.assertAlmostEqual(expected_entropy, computed)
def test_max_samples(self):
tcl = self.build(criteria="max_samples")
@@ -108,34 +157,73 @@ class Splitter_test(unittest.TestCase):
[0.1, 0.2, 0.3],
]
)
expected = np.array([-0.1, 0.01, 0.5, 0.1])
expected = np.array([2, 2, 1, 0])
computed = tcl._min_distance(data, None)
self.assertEqual((4,), computed.shape)
self.assertListEqual(expected.tolist(), computed.tolist())
def test_max_distance(self):
tcl = self.build(criteria="max_distance")
data = np.array(
[
[-0.1, 0.2, -0.3],
[0.7, 0.01, -0.1],
[0.7, -0.9, 0.5],
[0.1, 0.2, 0.3],
]
)
expected = np.array([1, 0, 0, 2])
computed = tcl._max_distance(data, None)
self.assertEqual((4,), computed.shape)
self.assertListEqual(expected.tolist(), computed.tolist())
def test_best_splitter_few_sets(self):
X, y = load_iris(return_X_y=True)
X = np.delete(X, 3, 1)
tcl = self.build(splitter_type="best", random_state=self._random_state)
dataset, computed = tcl.get_subspace(X, y, max_features=2)
self.assertListEqual([0, 2], list(computed))
self.assertListEqual(X[:, computed].tolist(), dataset.tolist())
def test_splitter_parameter(self):
expected_values = [
[1, 7, 9],
[1, 7, 9],
[1, 7, 9],
[1, 7, 9],
[0, 5, 6],
[0, 5, 6],
[0, 5, 6],
[0, 5, 6],
[2, 3, 5, 7], # best entropy min_distance
[0, 2, 4, 5], # best entropy max_samples
[0, 2, 8, 12], # best entropy max_distance
[1, 2, 5, 12], # best gini min_distance
[0, 3, 4, 10], # best gini max_samples
[1, 2, 9, 12], # best gini max_distance
[3, 9, 11, 12], # random entropy min_distance
[1, 5, 6, 9], # random entropy max_samples
[1, 2, 4, 8], # random entropy max_distance
[2, 6, 7, 12], # random gini min_distance
[3, 9, 10, 11], # random gini max_samples
[2, 5, 8, 12], # random gini max_distance
]
X, y = load_dataset(self._random_state, n_features=12)
X, y = load_wine(return_X_y=True)
rn = 0
for splitter_type in ["best", "random"]:
for criterion in ["gini", "entropy"]:
for criteria in ["min_distance", "max_samples"]:
for criterion in ["entropy", "gini"]:
for criteria in [
"min_distance",
"max_samples",
"max_distance",
]:
tcl = self.build(
splitter_type=splitter_type,
criterion=criterion,
criteria=criteria,
random_state=self._random_state,
)
expected = expected_values.pop(0)
dataset, computed = tcl.get_subspace(X, y, max_features=3)
random.seed(rn)
rn += 1
dataset, computed = tcl.get_subspace(X, y, max_features=4)
# print(
# "{}, # {:7s}{:8s}{:15s}".format(
# list(computed), splitter_type, criterion,
# criteria,
# )
# )
self.assertListEqual(expected, list(computed))
self.assertListEqual(
X[:, computed].tolist(), dataset.tolist()

View File

@@ -1,8 +1,11 @@
# type: ignore
import os
import unittest
import warnings
import numpy as np
from sklearn.datasets import load_iris
from sklearn.datasets import load_iris, load_wine
from sklearn.exceptions import ConvergenceWarning
from stree import Stree, Snode
from .utils import load_dataset
@@ -39,10 +42,7 @@ class Stree_test(unittest.TestCase):
_, count_u = np.unique(y_up, return_counts=True)
#
for i in unique_y:
try:
number_down = count_d[i]
except IndexError:
number_down = 0
number_down = count_d[i]
try:
number_up = count_u[i]
except IndexError:
@@ -59,33 +59,12 @@ class Stree_test(unittest.TestCase):
def test_build_tree(self):
"""Check if the tree is built the same way as predictions of models
"""
import warnings
warnings.filterwarnings("ignore")
for kernel in self._kernels:
clf = Stree(kernel=kernel, random_state=self._random_state)
clf.fit(*load_dataset(self._random_state))
self._check_tree(clf.tree_)
@staticmethod
def _find_out(px: np.array, x_original: np.array, y_original) -> list:
"""Find the original values of y for a given array of samples
Arguments:
px {np.array} -- array of samples to search for
x_original {np.array} -- original dataset
y_original {[type]} -- original classes
Returns:
np.array -- classes of the given samples
"""
res = []
for needle in px:
for row in range(x_original.shape[0]):
if all(x_original[row, :] == needle):
res.append(y_original[row])
return res
def test_single_prediction(self):
X, y = load_dataset(self._random_state)
for kernel in self._kernels:
@@ -102,22 +81,6 @@ class Stree_test(unittest.TestCase):
yp = clf.fit(X, y).predict(X[:num, :])
self.assertListEqual(y[:num].tolist(), yp.tolist())
def test_score(self):
X, y = load_dataset(self._random_state)
accuracies = [
0.9506666666666667,
0.9606666666666667,
0.9433333333333334,
]
for kernel, accuracy_expected in zip(self._kernels, accuracies):
clf = Stree(random_state=self._random_state, kernel=kernel,)
clf.fit(X, y)
accuracy_score = clf.score(X, y)
yp = clf.predict(X)
accuracy_computed = np.mean(yp == y)
self.assertEqual(accuracy_score, accuracy_computed)
self.assertAlmostEqual(accuracy_expected, accuracy_score)
def test_single_vs_multiple_prediction(self):
"""Check if predicting sample by sample gives the same result as
predicting all samples at once
@@ -164,9 +127,6 @@ class Stree_test(unittest.TestCase):
@staticmethod
def test_is_a_sklearn_classifier():
import warnings
from sklearn.exceptions import ConvergenceWarning
warnings.filterwarnings("ignore", category=ConvergenceWarning)
warnings.filterwarnings("ignore", category=RuntimeWarning)
from sklearn.utils.estimator_checks import check_estimator
@@ -239,6 +199,9 @@ class Stree_test(unittest.TestCase):
"min_distance linear": 0.9533333333333334,
"min_distance rbf": 0.836,
"min_distance poly": 0.9473333333333334,
"max_distance linear": 0.9533333333333334,
"max_distance rbf": 0.836,
"max_distance poly": 0.9473333333333334,
},
"Iris": {
"max_samples linear": 0.98,
@@ -247,11 +210,14 @@ class Stree_test(unittest.TestCase):
"min_distance linear": 0.98,
"min_distance rbf": 1.0,
"min_distance poly": 1.0,
"max_distance linear": 0.98,
"max_distance rbf": 1.0,
"max_distance poly": 1.0,
},
}
for name, dataset in datasets.items():
px, py = dataset
for criteria in ["max_samples", "min_distance"]:
for criteria in ["max_samples", "min_distance", "max_distance"]:
for kernel in self._kernels:
clf = Stree(
C=1e4,
@@ -274,7 +240,7 @@ class Stree_test(unittest.TestCase):
(None, 16),
]
clf = Stree()
clf.n_features_ = n_features
clf.n_features_in_ = n_features
for max_features, expected in expected_values:
clf.set_params(**dict(max_features=max_features))
computed = clf._initialize_max_features()
@@ -322,13 +288,151 @@ class Stree_test(unittest.TestCase):
with self.assertRaises(ValueError):
clf.predict(X[:, :3])
# Tests of score
def test_score_binary(self):
X, y = load_dataset(self._random_state)
accuracies = [
0.9506666666666667,
0.9606666666666667,
0.9433333333333334,
]
for kernel, accuracy_expected in zip(self._kernels, accuracies):
clf = Stree(random_state=self._random_state, kernel=kernel,)
clf.fit(X, y)
accuracy_score = clf.score(X, y)
yp = clf.predict(X)
accuracy_computed = np.mean(yp == y)
self.assertEqual(accuracy_score, accuracy_computed)
self.assertAlmostEqual(accuracy_expected, accuracy_score)
def test_score_max_features(self):
X, y = load_dataset(self._random_state)
clf = Stree(random_state=self._random_state, max_features=2)
clf.fit(X, y)
self.assertAlmostEqual(0.9426666666666667, clf.score(X, y))
def test_score_multi_class(self):
warnings.filterwarnings("ignore")
accuracies = [
0.8258427, # Wine linear min_distance
0.6741573, # Wine linear max_distance
0.8314607, # Wine linear max_samples
0.6629213, # Wine rbf min_distance
1.0000000, # Wine rbf max_distance
0.4044944, # Wine rbf max_samples
0.9157303, # Wine poly min_distance
1.0000000, # Wine poly max_distance
0.7640449, # Wine poly max_samples
0.9933333, # Iris linear min_distance
0.9666667, # Iris linear max_distance
0.9666667, # Iris linear max_samples
0.9800000, # Iris rbf min_distance
0.9800000, # Iris rbf max_distance
0.9800000, # Iris rbf max_samples
1.0000000, # Iris poly min_distance
1.0000000, # Iris poly max_distance
1.0000000, # Iris poly max_samples
0.8993333, # Synthetic linear min_distance
0.6533333, # Synthetic linear max_distance
0.9313333, # Synthetic linear max_samples
0.8320000, # Synthetic rbf min_distance
0.6660000, # Synthetic rbf max_distance
0.8320000, # Synthetic rbf max_samples
0.6066667, # Synthetic poly min_distance
0.6840000, # Synthetic poly max_distance
0.6340000, # Synthetic poly max_samples
]
datasets = [
("Wine", load_wine(return_X_y=True)),
("Iris", load_iris(return_X_y=True)),
(
"Synthetic",
load_dataset(self._random_state, n_classes=3, n_features=5),
),
]
for dataset_name, dataset in datasets:
X, y = dataset
for kernel in self._kernels:
for criteria in [
"min_distance",
"max_distance",
"max_samples",
]:
clf = Stree(
C=17,
random_state=self._random_state,
kernel=kernel,
split_criteria=criteria,
degree=5,
gamma="auto",
)
clf.fit(X, y)
accuracy_score = clf.score(X, y)
yp = clf.predict(X)
accuracy_computed = np.mean(yp == y)
# print(
# "{:.7f}, # {:7} {:5} {}".format(
# accuracy_score, dataset_name, kernel, criteria
# )
# )
accuracy_expected = accuracies.pop(0)
self.assertEqual(accuracy_score, accuracy_computed)
self.assertAlmostEqual(accuracy_expected, accuracy_score)
def test_bogus_splitter_parameter(self):
clf = Stree(splitter="duck")
with self.assertRaises(ValueError):
clf.fit(*load_dataset())
def test_weights_removing_class(self):
# This patch solves an stderr message from sklearn svm lib
# "WARNING: class label x specified in weight is not found"
X = np.array(
[
[0.1, 0.1],
[0.1, 0.2],
[0.2, 0.1],
[5, 6],
[8, 9],
[6, 7],
[0.2, 0.2],
]
)
y = np.array([0, 0, 0, 1, 1, 1, 0])
epsilon = 1e-5
weights = [1, 1, 1, 0, 0, 0, 1]
weights = np.array(weights, dtype="float64")
weights_epsilon = [x + epsilon for x in weights]
weights_no_zero = np.array([1, 1, 1, 0, 0, 2, 1])
original = weights_no_zero.copy()
clf = Stree()
clf.fit(X, y)
node = clf.train(X, y, weights, 1, "test",)
# if a class is lost with zero weights the patch adds epsilon
self.assertListEqual(weights.tolist(), weights_epsilon)
self.assertListEqual(node._sample_weight.tolist(), weights_epsilon)
# zero weights are ok when they don't erase a class
_ = clf.train(X, y, weights_no_zero, 1, "test")
self.assertListEqual(weights_no_zero.tolist(), original.tolist())
def test_build_predictor(self):
X, y = load_dataset(self._random_state)
clf = Stree(random_state=self._random_state)
with self.assertRaises(ValueError):
clf.tree_ = None
clf._build_predictor()
clf.fit(X, y)
node = clf.tree_.get_down().get_down()
expected_impurity = 0.04686951386893923
expected_class = 1
expected_belief = 0.9759887005649718
self.assertAlmostEqual(expected_impurity, node._impurity)
self.assertAlmostEqual(expected_belief, node._belief)
self.assertEqual(expected_class, node._class)
node._belief = 0.0
node._class = None
clf._build_predictor()
node = clf.tree_.get_down().get_down()
self.assertAlmostEqual(expected_belief, node._belief)
self.assertEqual(expected_class, node._class)

View File

@@ -1,3 +1,4 @@
# type: ignore
from .Stree_test import Stree_test
from .Snode_test import Snode_test
from .Splitter_test import Splitter_test

View File

@@ -1,3 +1,4 @@
# type: ignore
from sklearn.datasets import make_classification