mirror of
https://github.com/Doctorado-ML/STree.git
synced 2025-08-17 00:16:07 +00:00
Compare commits
10 Commits
add_subspa
...
mypy-stati
Author | SHA1 | Date | |
---|---|---|---|
d1e30a3372
|
|||
fa001f97a4
|
|||
be552fdd6c
|
|||
5e3a8e3ec5
|
|||
554ec03c32
|
|||
4b7e4a3fb0
|
|||
76723993fd
|
|||
ecd0b86f4d
|
|||
3e52a4746c
|
|||
|
a20e45e8e7 |
@@ -10,5 +10,4 @@ exclude_lines =
|
||||
if __name__ == .__main__.:
|
||||
ignore_errors = True
|
||||
omit =
|
||||
stree/tests/*
|
||||
stree/__init__.py
|
4
.gitignore
vendored
4
.gitignore
vendored
@@ -130,4 +130,6 @@ dmypy.json
|
||||
|
||||
.idea
|
||||
.vscode
|
||||
.pre-commit-config.yaml
|
||||
.pre-commit-config.yaml
|
||||
|
||||
**.csv
|
@@ -4,7 +4,7 @@
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"# Test AdaBoost with different configurations"
|
||||
"# Test Stree with AdaBoost and Bagging with different configurations"
|
||||
]
|
||||
},
|
||||
{
|
||||
@@ -34,11 +34,8 @@
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import time\n",
|
||||
"from sklearn.ensemble import AdaBoostClassifier\n",
|
||||
"from sklearn.tree import DecisionTreeClassifier\n",
|
||||
"from sklearn.svm import LinearSVC, SVC\n",
|
||||
"from sklearn.model_selection import GridSearchCV, train_test_split\n",
|
||||
"from sklearn.datasets import load_iris\n",
|
||||
"from sklearn.ensemble import AdaBoostClassifier, BaggingClassifier\n",
|
||||
"from sklearn.model_selection import train_test_split\n",
|
||||
"from stree import Stree"
|
||||
]
|
||||
},
|
||||
@@ -57,12 +54,14 @@
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"metadata": {},
|
||||
"metadata": {
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"output_type": "stream",
|
||||
"name": "stdout",
|
||||
"text": "Fraud: 0.173% 492\nValid: 99.827% 284315\nX.shape (100492, 28) y.shape (100492,)\nFraud: 0.659% 662\nValid: 99.341% 99830\n"
|
||||
"text": "Fraud: 0.173% 492\nValid: 99.827% 284315\nX.shape (100492, 28) y.shape (100492,)\nFraud: 0.644% 647\nValid: 99.356% 99845\n"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
@@ -117,18 +116,20 @@
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## STree alone on the whole dataset and linear kernel"
|
||||
"## STree alone with 100.000 samples and linear kernel"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 5,
|
||||
"metadata": {},
|
||||
"metadata": {
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"output_type": "stream",
|
||||
"name": "stdout",
|
||||
"text": "Score Train: 0.9985499829409757\nScore Test: 0.998407854584052\nTook 39.45 seconds\n"
|
||||
"text": "Score Train: 0.9985784146480154\nScore Test: 0.9981093273185617\nTook 73.27 seconds\n"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
@@ -144,7 +145,7 @@
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Different kernels with different configuations"
|
||||
"## Adaboost"
|
||||
]
|
||||
},
|
||||
{
|
||||
@@ -161,18 +162,20 @@
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 7,
|
||||
"metadata": {},
|
||||
"metadata": {
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"output_type": "stream",
|
||||
"name": "stdout",
|
||||
"text": "Kernel: linear\tTime: 87.00 seconds\tScore Train: 0.9982372\tScore Test: 0.9981425\nKernel: rbf\tTime: 60.60 seconds\tScore Train: 0.9934181\tScore Test: 0.9933992\nKernel: poly\tTime: 88.08 seconds\tScore Train: 0.9937450\tScore Test: 0.9938968\n"
|
||||
"text": "Kernel: linear\tTime: 93.78 seconds\tScore Train: 0.9983083\tScore Test: 0.9983083\nKernel: rbf\tTime: 18.32 seconds\tScore Train: 0.9935602\tScore Test: 0.9935651\nKernel: poly\tTime: 69.68 seconds\tScore Train: 0.9973132\tScore Test: 0.9972801\n"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"for kernel in ['linear', 'rbf', 'poly']:\n",
|
||||
" now = time.time()\n",
|
||||
" clf = AdaBoostClassifier(Stree(C=7, kernel=kernel, max_depth=max_depth, random_state=random_state), n_estimators=n_estimators, random_state=random_state)\n",
|
||||
" clf = AdaBoostClassifier(base_estimator=Stree(C=C, kernel=kernel, max_depth=max_depth, random_state=random_state), algorithm=\"SAMME\", n_estimators=n_estimators, random_state=random_state)\n",
|
||||
" clf.fit(Xtrain, ytrain)\n",
|
||||
" score_train = clf.score(Xtrain, ytrain)\n",
|
||||
" score_test = clf.score(Xtest, ytest)\n",
|
||||
@@ -183,24 +186,37 @@
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"## Test algorithm SAMME in AdaBoost to check speed/accuracy"
|
||||
"## Bagging"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 8,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"n_estimators = 10\n",
|
||||
"C = 7\n",
|
||||
"max_depth = 3"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 9,
|
||||
"metadata": {
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [
|
||||
{
|
||||
"output_type": "stream",
|
||||
"name": "stdout",
|
||||
"text": "Kernel: linear\tTime: 58.75 seconds\tScore Train: 0.9980524\tScore Test: 0.9978771\nKernel: rbf\tTime: 12.49 seconds\tScore Train: 0.9934181\tScore Test: 0.9933992\nKernel: poly\tTime: 97.85 seconds\tScore Train: 0.9972137\tScore Test: 0.9971806\n"
|
||||
"text": "Kernel: linear\tTime: 387.06 seconds\tScore Train: 0.9985784\tScore Test: 0.9981093\nKernel: rbf\tTime: 144.00 seconds\tScore Train: 0.9992750\tScore Test: 0.9983415\nKernel: poly\tTime: 101.78 seconds\tScore Train: 0.9992466\tScore Test: 0.9981757\n"
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"for kernel in ['linear', 'rbf', 'poly']:\n",
|
||||
" now = time.time()\n",
|
||||
" clf = AdaBoostClassifier(Stree(C=7, kernel=kernel, max_depth=max_depth, random_state=random_state), n_estimators=n_estimators, random_state=random_state, algorithm=\"SAMME\")\n",
|
||||
" clf = BaggingClassifier(base_estimator=Stree(C=C, kernel=kernel, max_depth=max_depth, random_state=random_state), n_estimators=n_estimators, random_state=random_state)\n",
|
||||
" clf.fit(Xtrain, ytrain)\n",
|
||||
" score_train = clf.score(Xtrain, ytrain)\n",
|
||||
" score_test = clf.score(Xtest, ytest)\n",
|
||||
@@ -223,7 +239,7 @@
|
||||
},
|
||||
"orig_nbformat": 2,
|
||||
"kernelspec": {
|
||||
"name": "python37664bitgeneralvenvfbd0a23e74cf4e778460f5ffc6761f39",
|
||||
"name": "python37664bitgeneralvenve3128601eb614c5da59c5055670b6040",
|
||||
"display_name": "Python 3.7.6 64-bit ('general': venv)"
|
||||
}
|
||||
},
|
File diff suppressed because one or more lines are too long
379
stree/Strees.py
379
stree/Strees.py
@@ -6,24 +6,24 @@ __version__ = "0.9"
|
||||
Build an oblique tree classifier based on SVM Trees
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
import os
|
||||
import numbers
|
||||
import random
|
||||
import warnings
|
||||
from typing import Optional, List, Union, Tuple
|
||||
from math import log
|
||||
from itertools import combinations
|
||||
import numpy as np
|
||||
from sklearn.base import BaseEstimator, ClassifierMixin
|
||||
from sklearn.svm import SVC, LinearSVC
|
||||
from sklearn.utils import check_consistent_length
|
||||
from sklearn.utils.multiclass import check_classification_targets
|
||||
from sklearn.exceptions import ConvergenceWarning
|
||||
from sklearn.utils.validation import (
|
||||
check_X_y,
|
||||
check_array,
|
||||
import numpy as np # type: ignore
|
||||
from sklearn.base import BaseEstimator, ClassifierMixin # type: ignore
|
||||
from sklearn.svm import SVC, LinearSVC # type: ignore
|
||||
from sklearn.utils.multiclass import ( # type: ignore
|
||||
check_classification_targets,
|
||||
)
|
||||
from sklearn.exceptions import ConvergenceWarning # type: ignore
|
||||
from sklearn.utils.validation import ( # type: ignore
|
||||
check_is_fitted,
|
||||
_check_sample_weight,
|
||||
)
|
||||
from sklearn.metrics._classification import _weighted_sum, _check_targets
|
||||
|
||||
|
||||
class Snode:
|
||||
@@ -33,29 +33,33 @@ class Snode:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
clf: SVC,
|
||||
clf: Union[SVC, LinearSVC],
|
||||
X: np.ndarray,
|
||||
y: np.ndarray,
|
||||
features: np.array,
|
||||
impurity: float,
|
||||
title: str,
|
||||
weight: np.ndarray = None,
|
||||
):
|
||||
self._clf = clf
|
||||
self._title = title
|
||||
self._belief = 0.0
|
||||
self._clf: Union[SVC, LinearSVC] = clf
|
||||
self._title: str = title
|
||||
self._belief: float = 0.0
|
||||
# Only store dataset in Testing
|
||||
self._X = X if os.environ.get("TESTING", "NS") != "NS" else None
|
||||
self._y = y
|
||||
self._down = None
|
||||
self._up = None
|
||||
self._X: Optional[np.array] = X if os.environ.get(
|
||||
"TESTING", "NS"
|
||||
) != "NS" else None
|
||||
self._y: np.array = y
|
||||
self._down: Optional[Snode] = None
|
||||
self._up: Optional[Snode] = None
|
||||
self._class = None
|
||||
self._feature = None
|
||||
self._sample_weight = None
|
||||
self._features = features
|
||||
self._impurity = impurity
|
||||
self._sample_weight: Optional[np.array] = (
|
||||
weight if os.environ.get("TESTING", "NS") != "NS" else None
|
||||
)
|
||||
self._features: Tuple[int, ...] = features
|
||||
self._impurity: float = impurity
|
||||
|
||||
@classmethod
|
||||
def copy(cls, node: "Snode") -> "Snode":
|
||||
def copy(cls, node: Snode) -> Snode:
|
||||
return cls(
|
||||
node._clf,
|
||||
node._X,
|
||||
@@ -65,22 +69,22 @@ class Snode:
|
||||
node._title,
|
||||
)
|
||||
|
||||
def set_down(self, son):
|
||||
def set_down(self, son: Snode) -> None:
|
||||
self._down = son
|
||||
|
||||
def set_up(self, son):
|
||||
def set_up(self, son: Snode) -> None:
|
||||
self._up = son
|
||||
|
||||
def is_leaf(self) -> bool:
|
||||
return self._up is None and self._down is None
|
||||
|
||||
def get_down(self) -> "Snode":
|
||||
def get_down(self) -> Optional[Snode]:
|
||||
return self._down
|
||||
|
||||
def get_up(self) -> "Snode":
|
||||
def get_up(self) -> Optional[Snode]:
|
||||
return self._up
|
||||
|
||||
def make_predictor(self):
|
||||
def make_predictor(self) -> None:
|
||||
"""Compute the class of the predictor and its belief based on the
|
||||
subdataset of the node only if it is a leaf
|
||||
"""
|
||||
@@ -119,11 +123,11 @@ class Siterator:
|
||||
"""Stree preorder iterator
|
||||
"""
|
||||
|
||||
def __init__(self, tree: Snode):
|
||||
self._stack = []
|
||||
def __init__(self, tree: Optional[Snode]):
|
||||
self._stack: List[Snode] = []
|
||||
self._push(tree)
|
||||
|
||||
def _push(self, node: Snode):
|
||||
def _push(self, node: Optional[Snode]) -> None:
|
||||
if node is not None:
|
||||
self._stack.append(node)
|
||||
|
||||
@@ -139,21 +143,21 @@ class Siterator:
|
||||
class Splitter:
|
||||
def __init__(
|
||||
self,
|
||||
clf: SVC = None,
|
||||
criterion: str = None,
|
||||
splitter_type: str = None,
|
||||
criteria: str = None,
|
||||
min_samples_split: int = None,
|
||||
random_state=None,
|
||||
clf: Union[SVC, LinearSVC] = None,
|
||||
criterion: str = "",
|
||||
splitter_type: str = "",
|
||||
criteria: str = "",
|
||||
min_samples_split: int = 0,
|
||||
random_state: Optional[int] = None,
|
||||
):
|
||||
self._clf = clf
|
||||
self._random_state = random_state
|
||||
self._clf: Union[SVC, LinearSVC] = clf
|
||||
self._random_state: Optional[int] = random_state
|
||||
if random_state is not None:
|
||||
random.seed(random_state)
|
||||
self._criterion = criterion
|
||||
self._min_samples_split = min_samples_split
|
||||
self._criteria = criteria
|
||||
self._splitter_type = splitter_type
|
||||
self._criterion: str = criterion
|
||||
self._min_samples_split: int = min_samples_split
|
||||
self._criteria: str = criteria
|
||||
self._splitter_type: str = splitter_type
|
||||
|
||||
if clf is None:
|
||||
raise ValueError(f"clf has to be a sklearn estimator, got({clf})")
|
||||
@@ -163,10 +167,10 @@ class Splitter:
|
||||
f"criterion must be gini or entropy got({criterion})"
|
||||
)
|
||||
|
||||
if criteria not in ["min_distance", "max_samples"]:
|
||||
if criteria not in ["min_distance", "max_samples", "max_distance"]:
|
||||
raise ValueError(
|
||||
f"split_criteria has to be min_distance or \
|
||||
max_samples got ({criteria})"
|
||||
"split_criteria has to be min_distance "
|
||||
f"max_distance or max_samples got ({criteria})"
|
||||
)
|
||||
|
||||
if splitter_type not in ["random", "best"]:
|
||||
@@ -182,29 +186,55 @@ class Splitter:
|
||||
@staticmethod
|
||||
def _gini(y: np.array) -> float:
|
||||
_, count = np.unique(y, return_counts=True)
|
||||
return 1 - np.sum(np.square(count / np.sum(count)))
|
||||
return float(1 - np.sum(np.square(count / np.sum(count))))
|
||||
|
||||
@staticmethod
|
||||
def _entropy(y: np.array) -> float:
|
||||
_, count = np.unique(y, return_counts=True)
|
||||
proportion = count / np.sum(count)
|
||||
return -np.sum(proportion * np.log2(proportion))
|
||||
n_labels = len(y)
|
||||
if n_labels <= 1:
|
||||
return 0
|
||||
counts = np.bincount(y)
|
||||
proportions = counts / n_labels
|
||||
n_classes = np.count_nonzero(proportions)
|
||||
if n_classes <= 1:
|
||||
return 0
|
||||
entropy = 0.0
|
||||
# Compute standard entropy.
|
||||
for prop in proportions:
|
||||
if prop != 0.0:
|
||||
entropy -= prop * log(prop, n_classes)
|
||||
return entropy
|
||||
|
||||
def information_gain(
|
||||
self, labels_up: np.array, labels_dn: np.array
|
||||
self, labels: np.array, labels_up: np.array, labels_dn: np.array
|
||||
) -> float:
|
||||
card_up = labels_up.shape[0] if labels_up is not None else 0
|
||||
card_dn = labels_dn.shape[0] if labels_dn is not None else 0
|
||||
imp_prev = self.criterion_function(labels)
|
||||
card_up = card_dn = imp_up = imp_dn = 0
|
||||
if labels_up is not None:
|
||||
card_up = labels_up.shape[0]
|
||||
imp_up = self.criterion_function(labels_up)
|
||||
if labels_dn is not None:
|
||||
card_dn = labels_dn.shape[0] if labels_dn is not None else 0
|
||||
imp_dn = self.criterion_function(labels_dn)
|
||||
samples = card_up + card_dn
|
||||
up = card_up / samples * self.criterion_function(labels_up)
|
||||
dn = card_dn / samples * self.criterion_function(labels_dn)
|
||||
return up + dn
|
||||
if samples == 0:
|
||||
return 0.0
|
||||
else:
|
||||
result = float(
|
||||
imp_prev
|
||||
- (card_up / samples) * imp_up
|
||||
- (card_dn / samples) * imp_dn
|
||||
)
|
||||
return result
|
||||
|
||||
def _select_best_set(
|
||||
self, dataset: np.array, labels: np.array, features_sets: list
|
||||
) -> list:
|
||||
min_impurity = 1
|
||||
selected = None
|
||||
self,
|
||||
dataset: np.array,
|
||||
labels: np.array,
|
||||
features_sets: List[Tuple[int, ...]],
|
||||
) -> Tuple[int, ...]:
|
||||
max_gain: float = 0.0
|
||||
selected: Union[Tuple[int, ...], None] = None
|
||||
warnings.filterwarnings("ignore", category=ConvergenceWarning)
|
||||
for feature_set in features_sets:
|
||||
self._clf.fit(dataset[:, feature_set], labels)
|
||||
@@ -213,11 +243,11 @@ class Splitter:
|
||||
)
|
||||
self.partition(dataset, node)
|
||||
y1, y2 = self.part(labels)
|
||||
impurity = self.information_gain(y1, y2)
|
||||
if impurity < min_impurity:
|
||||
min_impurity = impurity
|
||||
gain = self.information_gain(labels, y1, y2)
|
||||
if gain > max_gain:
|
||||
max_gain = gain
|
||||
selected = feature_set
|
||||
return selected
|
||||
return selected if selected is not None else feature_set
|
||||
|
||||
def _get_subspaces_set(
|
||||
self, dataset: np.array, labels: np.array, max_features: int
|
||||
@@ -226,37 +256,76 @@ class Splitter:
|
||||
features_sets = list(combinations(features, max_features))
|
||||
if len(features_sets) > 1:
|
||||
if self._splitter_type == "random":
|
||||
return features_sets[random.randint(0, len(features_sets) - 1)]
|
||||
index = random.randint(0, len(features_sets) - 1)
|
||||
return features_sets[index]
|
||||
else:
|
||||
# get only 3 sets at most
|
||||
if len(features_sets) > 3:
|
||||
features_sets = random.sample(features_sets, 3)
|
||||
return self._select_best_set(dataset, labels, features_sets)
|
||||
else:
|
||||
return features_sets[0]
|
||||
|
||||
def get_subspace(
|
||||
self, dataset: np.array, labels: np.array, max_features: int
|
||||
) -> list:
|
||||
) -> Tuple[np.array, np.array]:
|
||||
"""Return the best subspace to make a split
|
||||
"""
|
||||
indices = self._get_subspaces_set(dataset, labels, max_features)
|
||||
return dataset[:, indices], indices
|
||||
|
||||
@staticmethod
|
||||
def _min_distance(data: np.array, _) -> np.array:
|
||||
# chooses the lowest distance of every sample
|
||||
indices = np.argmin(np.abs(data), axis=1)
|
||||
return np.array(
|
||||
[data[x, y] for x, y in zip(range(len(data[:, 0])), indices)]
|
||||
)
|
||||
def _min_distance(data: np.array, _: np.array) -> np.array:
|
||||
"""Assign class to min distances
|
||||
|
||||
return a vector of classes so partition can separate class 0 from
|
||||
the rest of classes, ie. class 0 goes to one splitted node and the
|
||||
rest of classes go to the other
|
||||
:param data: distances to hyper plane of every class
|
||||
:type data: np.array (m, n_classes)
|
||||
:param _: enable call compat with other measures
|
||||
:type _: None
|
||||
:return: vector with the class assigned to each sample
|
||||
:rtype: np.array shape (m,)
|
||||
"""
|
||||
return np.argmin(data, axis=1)
|
||||
|
||||
@staticmethod
|
||||
def _max_distance(data: np.array, _: np.array) -> np.array:
|
||||
"""Assign class to max distances
|
||||
|
||||
return a vector of classes so partition can separate class 0 from
|
||||
the rest of classes, ie. class 0 goes to one splitted node and the
|
||||
rest of classes go to the other
|
||||
:param data: distances to hyper plane of every class
|
||||
:type data: np.array (m, n_classes)
|
||||
:param _: enable call compat with other measures
|
||||
:type _: None
|
||||
:return: vector with the class assigned to each sample values
|
||||
(can be 0, 1, ...)
|
||||
:rtype: np.array shape (m,)
|
||||
"""
|
||||
return np.argmax(data, axis=1)
|
||||
|
||||
@staticmethod
|
||||
def _max_samples(data: np.array, y: np.array) -> np.array:
|
||||
"""return distances of the class with more samples
|
||||
|
||||
:param data: distances to hyper plane of every class
|
||||
:type data: np.array (m, n_classes)
|
||||
:param y: vector of labels (classes)
|
||||
:type y: np.array (m,)
|
||||
:return: vector with distances to hyperplane (can be positive or neg.)
|
||||
:rtype: np.array shape (m,)
|
||||
"""
|
||||
# select the class with max number of samples
|
||||
_, samples = np.unique(y, return_counts=True)
|
||||
selected = np.argmax(samples)
|
||||
return data[:, selected]
|
||||
|
||||
def partition(self, samples: np.array, node: Snode):
|
||||
"""Set the criteria to split arrays
|
||||
def partition(self, samples: np.array, node: Snode) -> None:
|
||||
"""Set the criteria to split arrays. Compute the indices of the samples
|
||||
that should go to one side of the tree (down)
|
||||
|
||||
"""
|
||||
data = self._distances(node, samples)
|
||||
@@ -282,7 +351,7 @@ class Splitter:
|
||||
"""
|
||||
return node._clf.decision_function(data[:, node._features])
|
||||
|
||||
def part(self, origin: np.array) -> list:
|
||||
def part(self, origin: np.array) -> Tuple[np.array, np.array]:
|
||||
"""Split an array in two based on indices (down) and its complement
|
||||
|
||||
:param origin: dataset to split
|
||||
@@ -293,13 +362,13 @@ class Splitter:
|
||||
:rtype: list
|
||||
"""
|
||||
up = ~self._down
|
||||
return [
|
||||
return (
|
||||
origin[up] if any(up) else None,
|
||||
origin[self._down] if any(self._down) else None,
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
class Stree(BaseEstimator, ClassifierMixin):
|
||||
class Stree(BaseEstimator, ClassifierMixin): # type: ignore
|
||||
"""Estimator that is based on binary trees of svm nodes
|
||||
can deal with sample_weights in predict, used in boosting sklearn methods
|
||||
inheriting from BaseEstimator implements get_params and set_params methods
|
||||
@@ -312,42 +381,34 @@ class Stree(BaseEstimator, ClassifierMixin):
|
||||
C: float = 1.0,
|
||||
kernel: str = "linear",
|
||||
max_iter: int = 1000,
|
||||
random_state: int = None,
|
||||
max_depth: int = None,
|
||||
random_state: Optional[int] = None,
|
||||
max_depth: Optional[int] = None,
|
||||
tol: float = 1e-4,
|
||||
degree: int = 3,
|
||||
gamma="scale",
|
||||
gamma: Union[float, str] = "scale",
|
||||
split_criteria: str = "max_samples",
|
||||
criterion: str = "gini",
|
||||
min_samples_split: int = 0,
|
||||
max_features=None,
|
||||
max_features: Optional[Union[str, int, float]] = None,
|
||||
splitter: str = "random",
|
||||
):
|
||||
self.max_iter = max_iter
|
||||
self.C = C
|
||||
self.kernel = kernel
|
||||
self.random_state = random_state
|
||||
self.max_depth = max_depth
|
||||
self.tol = tol
|
||||
self.gamma = gamma
|
||||
self.degree = degree
|
||||
self.min_samples_split = min_samples_split
|
||||
self.split_criteria = split_criteria
|
||||
self.max_features = max_features
|
||||
self.criterion = criterion
|
||||
self.splitter = splitter
|
||||
|
||||
def _more_tags(self) -> dict:
|
||||
"""Required by sklearn to supply features of the classifier
|
||||
|
||||
:return: the tag required
|
||||
:rtype: dict
|
||||
"""
|
||||
return {"requires_y": True}
|
||||
self.C: float = C
|
||||
self.kernel: str = kernel
|
||||
self.random_state: Optional[int] = random_state
|
||||
self.max_depth: Optional[int] = max_depth
|
||||
self.tol: float = tol
|
||||
self.gamma: Union[float, str] = gamma
|
||||
self.degree: int = degree
|
||||
self.min_samples_split: int = min_samples_split
|
||||
self.split_criteria: str = split_criteria
|
||||
self.max_features: Union[str, int, float, None] = max_features
|
||||
self.criterion: str = criterion
|
||||
self.splitter: str = splitter
|
||||
|
||||
def fit(
|
||||
self, X: np.ndarray, y: np.ndarray, sample_weight: np.array = None
|
||||
) -> "Stree":
|
||||
) -> Stree:
|
||||
"""Build the tree based on the dataset of samples and its labels
|
||||
|
||||
:param X: dataset of samples to make predictions
|
||||
@@ -376,11 +437,11 @@ class Stree(BaseEstimator, ClassifierMixin):
|
||||
f"Maximum depth has to be greater than 1... got (max_depth=\
|
||||
{self.max_depth})"
|
||||
)
|
||||
|
||||
check_classification_targets(y)
|
||||
X, y = check_X_y(X, y)
|
||||
sample_weight = _check_sample_weight(sample_weight, X)
|
||||
check_classification_targets(y)
|
||||
X, y = self._validate_data(X, y)
|
||||
sample_weight = _check_sample_weight(
|
||||
sample_weight, X, dtype=np.float64
|
||||
)
|
||||
# Initialize computed parameters
|
||||
self.splitter_ = Splitter(
|
||||
clf=self._build_clf(),
|
||||
@@ -396,8 +457,6 @@ class Stree(BaseEstimator, ClassifierMixin):
|
||||
self.n_classes_ = self.classes_.shape[0]
|
||||
self.n_iter_ = self.max_iter
|
||||
self.depth_ = 0
|
||||
self.n_features_ = X.shape[1]
|
||||
self.n_features_in_ = X.shape[1]
|
||||
self.max_features_ = self._initialize_max_features()
|
||||
self.tree_ = self.train(X, y, sample_weight, 1, "root")
|
||||
self._build_predictor()
|
||||
@@ -410,7 +469,7 @@ class Stree(BaseEstimator, ClassifierMixin):
|
||||
sample_weight: np.ndarray,
|
||||
depth: int,
|
||||
title: str,
|
||||
) -> Snode:
|
||||
) -> Optional[Snode]:
|
||||
"""Recursive function to split the original dataset into predictor
|
||||
nodes (leaves)
|
||||
|
||||
@@ -439,13 +498,22 @@ class Stree(BaseEstimator, ClassifierMixin):
|
||||
features=X.shape[1],
|
||||
impurity=0.0,
|
||||
title=title + ", <pure>",
|
||||
weight=sample_weight,
|
||||
)
|
||||
# Train the model
|
||||
clf = self._build_clf()
|
||||
Xs, features = self.splitter_.get_subspace(X, y, self.max_features_)
|
||||
# solve WARNING: class label 0 specified in weight is not found
|
||||
# in bagging
|
||||
if any(sample_weight == 0):
|
||||
indices = sample_weight == 0
|
||||
y_next = y[~indices]
|
||||
# touch weights if removing any class
|
||||
if np.unique(y_next).shape[0] != self.n_classes_:
|
||||
sample_weight += 1e-5
|
||||
clf.fit(Xs, y, sample_weight=sample_weight)
|
||||
impurity = self.splitter_.impurity(y)
|
||||
node = Snode(clf, X, y, features, impurity, title)
|
||||
node = Snode(clf, X, y, features, impurity, title, sample_weight)
|
||||
self.depth_ = max(depth, self.depth_)
|
||||
self.splitter_.partition(X, node)
|
||||
X_U, X_D = self.splitter_.part(X)
|
||||
@@ -460,16 +528,27 @@ class Stree(BaseEstimator, ClassifierMixin):
|
||||
features=X.shape[1],
|
||||
impurity=impurity,
|
||||
title=title + ", <cgaf>",
|
||||
weight=sample_weight,
|
||||
)
|
||||
node.set_up(self.train(X_U, y_u, sw_u, depth + 1, title + " - Up"))
|
||||
node.set_down(self.train(X_D, y_d, sw_d, depth + 1, title + " - Down"))
|
||||
node.set_up(
|
||||
self.train( # type: ignore
|
||||
X_U, y_u, sw_u, depth + 1, title + " - Up"
|
||||
)
|
||||
)
|
||||
node.set_down(
|
||||
self.train( # type: ignore
|
||||
X_D, y_d, sw_d, depth + 1, title + " - Down"
|
||||
)
|
||||
)
|
||||
return node
|
||||
|
||||
def _build_predictor(self):
|
||||
def _build_predictor(self) -> None:
|
||||
"""Process the leaves to make them predictors
|
||||
"""
|
||||
|
||||
def run_tree(node: Snode):
|
||||
def run_tree(node: Optional[Snode]) -> None:
|
||||
if node is None:
|
||||
raise ValueError("Can't build predictors on None")
|
||||
if node.is_leaf():
|
||||
node.make_predictor()
|
||||
return
|
||||
@@ -478,7 +557,7 @@ class Stree(BaseEstimator, ClassifierMixin):
|
||||
|
||||
run_tree(self.tree_)
|
||||
|
||||
def _build_clf(self):
|
||||
def _build_clf(self) -> Union[LinearSVC, SVC]:
|
||||
""" Build the correct classifier for the node
|
||||
"""
|
||||
return (
|
||||
@@ -527,30 +606,30 @@ class Stree(BaseEstimator, ClassifierMixin):
|
||||
"""
|
||||
|
||||
def predict_class(
|
||||
xp: np.array, indices: np.array, node: Snode
|
||||
xp: np.array, indices: np.array, node: Optional[Snode]
|
||||
) -> np.array:
|
||||
if xp is None:
|
||||
return [], []
|
||||
if node.is_leaf():
|
||||
if node.is_leaf(): # type: ignore
|
||||
# set a class for every sample in dataset
|
||||
prediction = np.full((xp.shape[0], 1), node._class)
|
||||
prediction = np.full(
|
||||
(xp.shape[0], 1), node._class # type: ignore
|
||||
)
|
||||
return prediction, indices
|
||||
self.splitter_.partition(xp, node)
|
||||
self.splitter_.partition(xp, node) # type: ignore
|
||||
x_u, x_d = self.splitter_.part(xp)
|
||||
i_u, i_d = self.splitter_.part(indices)
|
||||
prx_u, prin_u = predict_class(x_u, i_u, node.get_up())
|
||||
prx_d, prin_d = predict_class(x_d, i_d, node.get_down())
|
||||
prx_u, prin_u = predict_class(
|
||||
x_u, i_u, node.get_up() # type: ignore
|
||||
)
|
||||
prx_d, prin_d = predict_class(
|
||||
x_d, i_d, node.get_down() # type: ignore
|
||||
)
|
||||
return np.append(prx_u, prx_d), np.append(prin_u, prin_d)
|
||||
|
||||
# sklearn check
|
||||
check_is_fitted(self, ["tree_"])
|
||||
check_is_fitted(self, "n_features_in_")
|
||||
# Input validation
|
||||
X = check_array(X)
|
||||
if X.shape[1] != self.n_features_:
|
||||
raise ValueError(
|
||||
f"Expected {self.n_features_} features but got "
|
||||
f"({X.shape[1]})"
|
||||
)
|
||||
X = self._validate_data(X, reset=False)
|
||||
# setup prediction & make it happen
|
||||
indices = np.arange(X.shape[0])
|
||||
result = (
|
||||
@@ -560,32 +639,6 @@ class Stree(BaseEstimator, ClassifierMixin):
|
||||
)
|
||||
return self.classes_[result]
|
||||
|
||||
def score(
|
||||
self, X: np.array, y: np.array, sample_weight: np.array = None
|
||||
) -> float:
|
||||
"""Compute accuracy of the prediction
|
||||
|
||||
:param X: dataset of samples to make predictions
|
||||
:type X: np.array
|
||||
:param y_true: samples labels
|
||||
:type y_true: np.array
|
||||
:param sample_weight: weights of the samples. Rescale C per sample.
|
||||
Hi' weights force the classifier to put more emphasis on these points
|
||||
:type sample_weight: np.array optional
|
||||
:return: accuracy of the prediction
|
||||
:rtype: float
|
||||
"""
|
||||
# sklearn check
|
||||
check_is_fitted(self)
|
||||
check_classification_targets(y)
|
||||
X, y = check_X_y(X, y)
|
||||
y_pred = self.predict(X).reshape(y.shape)
|
||||
# Compute accuracy for each possible representation
|
||||
_, y_true, y_pred = _check_targets(y, y_pred)
|
||||
check_consistent_length(y_true, y_pred, sample_weight)
|
||||
score = y_true == y_pred
|
||||
return _weighted_sum(score, sample_weight, normalize=True)
|
||||
|
||||
def __iter__(self) -> Siterator:
|
||||
"""Create an iterator to be able to visit the nodes of the tree in
|
||||
preorder, can make a list with all the nodes in preorder
|
||||
@@ -613,11 +666,11 @@ class Stree(BaseEstimator, ClassifierMixin):
|
||||
def _initialize_max_features(self) -> int:
|
||||
if isinstance(self.max_features, str):
|
||||
if self.max_features == "auto":
|
||||
max_features = max(1, int(np.sqrt(self.n_features_)))
|
||||
max_features = max(1, int(np.sqrt(self.n_features_in_)))
|
||||
elif self.max_features == "sqrt":
|
||||
max_features = max(1, int(np.sqrt(self.n_features_)))
|
||||
max_features = max(1, int(np.sqrt(self.n_features_in_)))
|
||||
elif self.max_features == "log2":
|
||||
max_features = max(1, int(np.log2(self.n_features_)))
|
||||
max_features = max(1, int(np.log2(self.n_features_in_)))
|
||||
else:
|
||||
raise ValueError(
|
||||
"Invalid value for max_features. "
|
||||
@@ -625,13 +678,13 @@ class Stree(BaseEstimator, ClassifierMixin):
|
||||
"'sqrt' or 'log2'."
|
||||
)
|
||||
elif self.max_features is None:
|
||||
max_features = self.n_features_
|
||||
elif isinstance(self.max_features, numbers.Integral):
|
||||
max_features = self.n_features_in_
|
||||
elif isinstance(self.max_features, int):
|
||||
max_features = self.max_features
|
||||
else: # float
|
||||
if self.max_features > 0.0:
|
||||
max_features = max(
|
||||
1, int(self.max_features * self.n_features_)
|
||||
1, int(self.max_features * self.n_features_in_)
|
||||
)
|
||||
else:
|
||||
raise ValueError(
|
||||
|
@@ -1,3 +1,4 @@
|
||||
# type: ignore
|
||||
import os
|
||||
import unittest
|
||||
|
||||
@@ -33,10 +34,7 @@ class Snode_test(unittest.TestCase):
|
||||
max_card = max(card)
|
||||
min_card = min(card)
|
||||
if len(classes) > 1:
|
||||
try:
|
||||
belief = max_card / (max_card + min_card)
|
||||
except ZeroDivisionError:
|
||||
belief = 0.0
|
||||
belief = max_card / (max_card + min_card)
|
||||
else:
|
||||
belief = 1
|
||||
self.assertEqual(belief, node._belief)
|
||||
|
@@ -1,11 +1,12 @@
|
||||
# type: ignore
|
||||
import os
|
||||
import unittest
|
||||
import random
|
||||
|
||||
import numpy as np
|
||||
from sklearn.svm import LinearSVC
|
||||
|
||||
from sklearn.svm import SVC
|
||||
from sklearn.datasets import load_wine, load_iris
|
||||
from stree import Splitter
|
||||
from .utils import load_dataset
|
||||
|
||||
|
||||
class Splitter_test(unittest.TestCase):
|
||||
@@ -15,7 +16,7 @@ class Splitter_test(unittest.TestCase):
|
||||
|
||||
@staticmethod
|
||||
def build(
|
||||
clf=LinearSVC(),
|
||||
clf=SVC,
|
||||
min_samples_split=0,
|
||||
splitter_type="random",
|
||||
criterion="gini",
|
||||
@@ -23,7 +24,7 @@ class Splitter_test(unittest.TestCase):
|
||||
random_state=None,
|
||||
):
|
||||
return Splitter(
|
||||
clf=clf,
|
||||
clf=clf(random_state=random_state, kernel="rbf"),
|
||||
min_samples_split=min_samples_split,
|
||||
splitter_type=splitter_type,
|
||||
criterion=criterion,
|
||||
@@ -43,10 +44,14 @@ class Splitter_test(unittest.TestCase):
|
||||
with self.assertRaises(ValueError):
|
||||
self.build(criteria="duck")
|
||||
with self.assertRaises(ValueError):
|
||||
self.build(clf=None)
|
||||
_ = Splitter(clf=None)
|
||||
for splitter_type in ["best", "random"]:
|
||||
for criterion in ["gini", "entropy"]:
|
||||
for criteria in ["min_distance", "max_samples"]:
|
||||
for criteria in [
|
||||
"min_distance",
|
||||
"max_samples",
|
||||
"max_distance",
|
||||
]:
|
||||
tcl = self.build(
|
||||
splitter_type=splitter_type,
|
||||
criterion=criterion,
|
||||
@@ -57,30 +62,74 @@ class Splitter_test(unittest.TestCase):
|
||||
self.assertEqual(criteria, tcl._criteria)
|
||||
|
||||
def test_gini(self):
|
||||
y = [0, 1, 1, 1, 1, 1, 0, 0, 0, 1]
|
||||
expected = 0.48
|
||||
self.assertEqual(expected, Splitter._gini(y))
|
||||
tcl = self.build(criterion="gini")
|
||||
self.assertEqual(expected, tcl.criterion_function(y))
|
||||
expected_values = [
|
||||
([0, 1, 1, 1, 1, 1, 0, 0, 0, 1], 0.48),
|
||||
([0, 1, 1, 2, 2, 3, 4, 5, 3, 2, 1, 1], 0.7777777777777778),
|
||||
([0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 2, 2, 2], 0.520408163265306),
|
||||
([0, 0, 1, 1, 1, 1, 0, 0], 0.5),
|
||||
([0, 0, 1, 1, 2, 2, 3, 3], 0.75),
|
||||
([0, 0, 1, 1, 1, 1, 1, 1], 0.375),
|
||||
([0], 0),
|
||||
([1, 1, 1, 1], 0),
|
||||
]
|
||||
for labels, expected in expected_values:
|
||||
self.assertAlmostEqual(expected, Splitter._gini(labels))
|
||||
tcl = self.build(criterion="gini")
|
||||
self.assertAlmostEqual(expected, tcl.criterion_function(labels))
|
||||
|
||||
def test_entropy(self):
|
||||
y = [0, 1, 1, 1, 1, 1, 0, 0, 0, 1]
|
||||
expected = 0.9709505944546686
|
||||
self.assertAlmostEqual(expected, Splitter._entropy(y))
|
||||
tcl = self.build(criterion="entropy")
|
||||
self.assertEqual(expected, tcl.criterion_function(y))
|
||||
expected_values = [
|
||||
([0, 1, 1, 1, 1, 1, 0, 0, 0, 1], 0.9709505944546686),
|
||||
([0, 1, 1, 2, 2, 3, 4, 5, 3, 2, 1, 1], 0.9111886696810589),
|
||||
([0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 2, 2, 2], 0.8120406807940999),
|
||||
([0, 0, 1, 1, 1, 1, 0, 0], 1),
|
||||
([0, 0, 1, 1, 2, 2, 3, 3], 1),
|
||||
([0, 0, 1, 1, 1, 1, 1, 1], 0.8112781244591328),
|
||||
([1], 0),
|
||||
([0, 0, 0, 0], 0),
|
||||
]
|
||||
for labels, expected in expected_values:
|
||||
self.assertAlmostEqual(expected, Splitter._entropy(labels))
|
||||
tcl = self.build(criterion="entropy")
|
||||
self.assertAlmostEqual(expected, tcl.criterion_function(labels))
|
||||
|
||||
def test_information_gain(self):
|
||||
yu = np.array([0, 1, 1, 1, 1, 1])
|
||||
yd = np.array([0, 0, 0, 1])
|
||||
values_expected = [
|
||||
("gini", 0.31666666666666665),
|
||||
("entropy", 0.7145247027726656),
|
||||
expected_values = [
|
||||
(
|
||||
[0, 1, 1, 1, 1, 1],
|
||||
[0, 0, 0, 1],
|
||||
0.16333333333333333,
|
||||
0.25642589168200297,
|
||||
),
|
||||
(
|
||||
[0, 1, 1, 2, 2, 3, 4, 5, 3, 2, 1, 1],
|
||||
[5, 3, 2, 1, 1],
|
||||
0.007381776239907684,
|
||||
-0.03328610916207225,
|
||||
),
|
||||
([], [], 0.0, 0.0),
|
||||
([1], [], 0.0, 0.0),
|
||||
([], [1], 0.0, 0.0),
|
||||
([0, 0, 0, 0], [0, 0], 0.0, 0.0),
|
||||
([], [1, 1, 1, 2], 0.0, 0.0),
|
||||
(None, [1, 2, 3], 0.0, 0.0),
|
||||
([1, 2, 3], None, 0.0, 0.0),
|
||||
]
|
||||
for criterion, expected in values_expected:
|
||||
tcl = self.build(criterion=criterion)
|
||||
computed = tcl.information_gain(yu, yd)
|
||||
self.assertAlmostEqual(expected, computed)
|
||||
for yu, yd, expected_gini, expected_entropy in expected_values:
|
||||
yu = np.array(yu, dtype=np.int32) if yu is not None else None
|
||||
yd = np.array(yd, dtype=np.int32) if yd is not None else None
|
||||
if yu is not None and yd is not None:
|
||||
complete = np.append(yu, yd)
|
||||
elif yd is not None:
|
||||
complete = yd
|
||||
else:
|
||||
complete = yu
|
||||
tcl = self.build(criterion="gini")
|
||||
computed = tcl.information_gain(complete, yu, yd)
|
||||
self.assertAlmostEqual(expected_gini, computed)
|
||||
tcl = self.build(criterion="entropy")
|
||||
computed = tcl.information_gain(complete, yu, yd)
|
||||
self.assertAlmostEqual(expected_entropy, computed)
|
||||
|
||||
def test_max_samples(self):
|
||||
tcl = self.build(criteria="max_samples")
|
||||
@@ -108,34 +157,73 @@ class Splitter_test(unittest.TestCase):
|
||||
[0.1, 0.2, 0.3],
|
||||
]
|
||||
)
|
||||
expected = np.array([-0.1, 0.01, 0.5, 0.1])
|
||||
expected = np.array([2, 2, 1, 0])
|
||||
computed = tcl._min_distance(data, None)
|
||||
self.assertEqual((4,), computed.shape)
|
||||
self.assertListEqual(expected.tolist(), computed.tolist())
|
||||
|
||||
def test_max_distance(self):
|
||||
tcl = self.build(criteria="max_distance")
|
||||
data = np.array(
|
||||
[
|
||||
[-0.1, 0.2, -0.3],
|
||||
[0.7, 0.01, -0.1],
|
||||
[0.7, -0.9, 0.5],
|
||||
[0.1, 0.2, 0.3],
|
||||
]
|
||||
)
|
||||
expected = np.array([1, 0, 0, 2])
|
||||
computed = tcl._max_distance(data, None)
|
||||
self.assertEqual((4,), computed.shape)
|
||||
self.assertListEqual(expected.tolist(), computed.tolist())
|
||||
|
||||
def test_best_splitter_few_sets(self):
|
||||
X, y = load_iris(return_X_y=True)
|
||||
X = np.delete(X, 3, 1)
|
||||
tcl = self.build(splitter_type="best", random_state=self._random_state)
|
||||
dataset, computed = tcl.get_subspace(X, y, max_features=2)
|
||||
self.assertListEqual([0, 2], list(computed))
|
||||
self.assertListEqual(X[:, computed].tolist(), dataset.tolist())
|
||||
|
||||
def test_splitter_parameter(self):
|
||||
expected_values = [
|
||||
[1, 7, 9],
|
||||
[1, 7, 9],
|
||||
[1, 7, 9],
|
||||
[1, 7, 9],
|
||||
[0, 5, 6],
|
||||
[0, 5, 6],
|
||||
[0, 5, 6],
|
||||
[0, 5, 6],
|
||||
[2, 3, 5, 7], # best entropy min_distance
|
||||
[0, 2, 4, 5], # best entropy max_samples
|
||||
[0, 2, 8, 12], # best entropy max_distance
|
||||
[1, 2, 5, 12], # best gini min_distance
|
||||
[0, 3, 4, 10], # best gini max_samples
|
||||
[1, 2, 9, 12], # best gini max_distance
|
||||
[3, 9, 11, 12], # random entropy min_distance
|
||||
[1, 5, 6, 9], # random entropy max_samples
|
||||
[1, 2, 4, 8], # random entropy max_distance
|
||||
[2, 6, 7, 12], # random gini min_distance
|
||||
[3, 9, 10, 11], # random gini max_samples
|
||||
[2, 5, 8, 12], # random gini max_distance
|
||||
]
|
||||
X, y = load_dataset(self._random_state, n_features=12)
|
||||
X, y = load_wine(return_X_y=True)
|
||||
rn = 0
|
||||
for splitter_type in ["best", "random"]:
|
||||
for criterion in ["gini", "entropy"]:
|
||||
for criteria in ["min_distance", "max_samples"]:
|
||||
for criterion in ["entropy", "gini"]:
|
||||
for criteria in [
|
||||
"min_distance",
|
||||
"max_samples",
|
||||
"max_distance",
|
||||
]:
|
||||
tcl = self.build(
|
||||
splitter_type=splitter_type,
|
||||
criterion=criterion,
|
||||
criteria=criteria,
|
||||
random_state=self._random_state,
|
||||
)
|
||||
expected = expected_values.pop(0)
|
||||
dataset, computed = tcl.get_subspace(X, y, max_features=3)
|
||||
random.seed(rn)
|
||||
rn += 1
|
||||
dataset, computed = tcl.get_subspace(X, y, max_features=4)
|
||||
# print(
|
||||
# "{}, # {:7s}{:8s}{:15s}".format(
|
||||
# list(computed), splitter_type, criterion,
|
||||
# criteria,
|
||||
# )
|
||||
# )
|
||||
self.assertListEqual(expected, list(computed))
|
||||
self.assertListEqual(
|
||||
X[:, computed].tolist(), dataset.tolist()
|
||||
|
@@ -1,8 +1,11 @@
|
||||
# type: ignore
|
||||
import os
|
||||
import unittest
|
||||
import warnings
|
||||
|
||||
import numpy as np
|
||||
from sklearn.datasets import load_iris
|
||||
from sklearn.datasets import load_iris, load_wine
|
||||
from sklearn.exceptions import ConvergenceWarning
|
||||
|
||||
from stree import Stree, Snode
|
||||
from .utils import load_dataset
|
||||
@@ -39,10 +42,7 @@ class Stree_test(unittest.TestCase):
|
||||
_, count_u = np.unique(y_up, return_counts=True)
|
||||
#
|
||||
for i in unique_y:
|
||||
try:
|
||||
number_down = count_d[i]
|
||||
except IndexError:
|
||||
number_down = 0
|
||||
number_down = count_d[i]
|
||||
try:
|
||||
number_up = count_u[i]
|
||||
except IndexError:
|
||||
@@ -59,33 +59,12 @@ class Stree_test(unittest.TestCase):
|
||||
def test_build_tree(self):
|
||||
"""Check if the tree is built the same way as predictions of models
|
||||
"""
|
||||
import warnings
|
||||
|
||||
warnings.filterwarnings("ignore")
|
||||
for kernel in self._kernels:
|
||||
clf = Stree(kernel=kernel, random_state=self._random_state)
|
||||
clf.fit(*load_dataset(self._random_state))
|
||||
self._check_tree(clf.tree_)
|
||||
|
||||
@staticmethod
|
||||
def _find_out(px: np.array, x_original: np.array, y_original) -> list:
|
||||
"""Find the original values of y for a given array of samples
|
||||
|
||||
Arguments:
|
||||
px {np.array} -- array of samples to search for
|
||||
x_original {np.array} -- original dataset
|
||||
y_original {[type]} -- original classes
|
||||
|
||||
Returns:
|
||||
np.array -- classes of the given samples
|
||||
"""
|
||||
res = []
|
||||
for needle in px:
|
||||
for row in range(x_original.shape[0]):
|
||||
if all(x_original[row, :] == needle):
|
||||
res.append(y_original[row])
|
||||
return res
|
||||
|
||||
def test_single_prediction(self):
|
||||
X, y = load_dataset(self._random_state)
|
||||
for kernel in self._kernels:
|
||||
@@ -102,22 +81,6 @@ class Stree_test(unittest.TestCase):
|
||||
yp = clf.fit(X, y).predict(X[:num, :])
|
||||
self.assertListEqual(y[:num].tolist(), yp.tolist())
|
||||
|
||||
def test_score(self):
|
||||
X, y = load_dataset(self._random_state)
|
||||
accuracies = [
|
||||
0.9506666666666667,
|
||||
0.9606666666666667,
|
||||
0.9433333333333334,
|
||||
]
|
||||
for kernel, accuracy_expected in zip(self._kernels, accuracies):
|
||||
clf = Stree(random_state=self._random_state, kernel=kernel,)
|
||||
clf.fit(X, y)
|
||||
accuracy_score = clf.score(X, y)
|
||||
yp = clf.predict(X)
|
||||
accuracy_computed = np.mean(yp == y)
|
||||
self.assertEqual(accuracy_score, accuracy_computed)
|
||||
self.assertAlmostEqual(accuracy_expected, accuracy_score)
|
||||
|
||||
def test_single_vs_multiple_prediction(self):
|
||||
"""Check if predicting sample by sample gives the same result as
|
||||
predicting all samples at once
|
||||
@@ -164,9 +127,6 @@ class Stree_test(unittest.TestCase):
|
||||
|
||||
@staticmethod
|
||||
def test_is_a_sklearn_classifier():
|
||||
import warnings
|
||||
from sklearn.exceptions import ConvergenceWarning
|
||||
|
||||
warnings.filterwarnings("ignore", category=ConvergenceWarning)
|
||||
warnings.filterwarnings("ignore", category=RuntimeWarning)
|
||||
from sklearn.utils.estimator_checks import check_estimator
|
||||
@@ -239,6 +199,9 @@ class Stree_test(unittest.TestCase):
|
||||
"min_distance linear": 0.9533333333333334,
|
||||
"min_distance rbf": 0.836,
|
||||
"min_distance poly": 0.9473333333333334,
|
||||
"max_distance linear": 0.9533333333333334,
|
||||
"max_distance rbf": 0.836,
|
||||
"max_distance poly": 0.9473333333333334,
|
||||
},
|
||||
"Iris": {
|
||||
"max_samples linear": 0.98,
|
||||
@@ -247,11 +210,14 @@ class Stree_test(unittest.TestCase):
|
||||
"min_distance linear": 0.98,
|
||||
"min_distance rbf": 1.0,
|
||||
"min_distance poly": 1.0,
|
||||
"max_distance linear": 0.98,
|
||||
"max_distance rbf": 1.0,
|
||||
"max_distance poly": 1.0,
|
||||
},
|
||||
}
|
||||
for name, dataset in datasets.items():
|
||||
px, py = dataset
|
||||
for criteria in ["max_samples", "min_distance"]:
|
||||
for criteria in ["max_samples", "min_distance", "max_distance"]:
|
||||
for kernel in self._kernels:
|
||||
clf = Stree(
|
||||
C=1e4,
|
||||
@@ -274,7 +240,7 @@ class Stree_test(unittest.TestCase):
|
||||
(None, 16),
|
||||
]
|
||||
clf = Stree()
|
||||
clf.n_features_ = n_features
|
||||
clf.n_features_in_ = n_features
|
||||
for max_features, expected in expected_values:
|
||||
clf.set_params(**dict(max_features=max_features))
|
||||
computed = clf._initialize_max_features()
|
||||
@@ -322,13 +288,151 @@ class Stree_test(unittest.TestCase):
|
||||
with self.assertRaises(ValueError):
|
||||
clf.predict(X[:, :3])
|
||||
|
||||
# Tests of score
|
||||
|
||||
def test_score_binary(self):
|
||||
X, y = load_dataset(self._random_state)
|
||||
accuracies = [
|
||||
0.9506666666666667,
|
||||
0.9606666666666667,
|
||||
0.9433333333333334,
|
||||
]
|
||||
for kernel, accuracy_expected in zip(self._kernels, accuracies):
|
||||
clf = Stree(random_state=self._random_state, kernel=kernel,)
|
||||
clf.fit(X, y)
|
||||
accuracy_score = clf.score(X, y)
|
||||
yp = clf.predict(X)
|
||||
accuracy_computed = np.mean(yp == y)
|
||||
self.assertEqual(accuracy_score, accuracy_computed)
|
||||
self.assertAlmostEqual(accuracy_expected, accuracy_score)
|
||||
|
||||
def test_score_max_features(self):
|
||||
X, y = load_dataset(self._random_state)
|
||||
clf = Stree(random_state=self._random_state, max_features=2)
|
||||
clf.fit(X, y)
|
||||
self.assertAlmostEqual(0.9426666666666667, clf.score(X, y))
|
||||
|
||||
def test_score_multi_class(self):
|
||||
warnings.filterwarnings("ignore")
|
||||
accuracies = [
|
||||
0.8258427, # Wine linear min_distance
|
||||
0.6741573, # Wine linear max_distance
|
||||
0.8314607, # Wine linear max_samples
|
||||
0.6629213, # Wine rbf min_distance
|
||||
1.0000000, # Wine rbf max_distance
|
||||
0.4044944, # Wine rbf max_samples
|
||||
0.9157303, # Wine poly min_distance
|
||||
1.0000000, # Wine poly max_distance
|
||||
0.7640449, # Wine poly max_samples
|
||||
0.9933333, # Iris linear min_distance
|
||||
0.9666667, # Iris linear max_distance
|
||||
0.9666667, # Iris linear max_samples
|
||||
0.9800000, # Iris rbf min_distance
|
||||
0.9800000, # Iris rbf max_distance
|
||||
0.9800000, # Iris rbf max_samples
|
||||
1.0000000, # Iris poly min_distance
|
||||
1.0000000, # Iris poly max_distance
|
||||
1.0000000, # Iris poly max_samples
|
||||
0.8993333, # Synthetic linear min_distance
|
||||
0.6533333, # Synthetic linear max_distance
|
||||
0.9313333, # Synthetic linear max_samples
|
||||
0.8320000, # Synthetic rbf min_distance
|
||||
0.6660000, # Synthetic rbf max_distance
|
||||
0.8320000, # Synthetic rbf max_samples
|
||||
0.6066667, # Synthetic poly min_distance
|
||||
0.6840000, # Synthetic poly max_distance
|
||||
0.6340000, # Synthetic poly max_samples
|
||||
]
|
||||
datasets = [
|
||||
("Wine", load_wine(return_X_y=True)),
|
||||
("Iris", load_iris(return_X_y=True)),
|
||||
(
|
||||
"Synthetic",
|
||||
load_dataset(self._random_state, n_classes=3, n_features=5),
|
||||
),
|
||||
]
|
||||
for dataset_name, dataset in datasets:
|
||||
X, y = dataset
|
||||
for kernel in self._kernels:
|
||||
for criteria in [
|
||||
"min_distance",
|
||||
"max_distance",
|
||||
"max_samples",
|
||||
]:
|
||||
clf = Stree(
|
||||
C=17,
|
||||
random_state=self._random_state,
|
||||
kernel=kernel,
|
||||
split_criteria=criteria,
|
||||
degree=5,
|
||||
gamma="auto",
|
||||
)
|
||||
clf.fit(X, y)
|
||||
accuracy_score = clf.score(X, y)
|
||||
yp = clf.predict(X)
|
||||
accuracy_computed = np.mean(yp == y)
|
||||
# print(
|
||||
# "{:.7f}, # {:7} {:5} {}".format(
|
||||
# accuracy_score, dataset_name, kernel, criteria
|
||||
# )
|
||||
# )
|
||||
accuracy_expected = accuracies.pop(0)
|
||||
self.assertEqual(accuracy_score, accuracy_computed)
|
||||
self.assertAlmostEqual(accuracy_expected, accuracy_score)
|
||||
|
||||
def test_bogus_splitter_parameter(self):
|
||||
clf = Stree(splitter="duck")
|
||||
with self.assertRaises(ValueError):
|
||||
clf.fit(*load_dataset())
|
||||
|
||||
def test_weights_removing_class(self):
|
||||
# This patch solves an stderr message from sklearn svm lib
|
||||
# "WARNING: class label x specified in weight is not found"
|
||||
X = np.array(
|
||||
[
|
||||
[0.1, 0.1],
|
||||
[0.1, 0.2],
|
||||
[0.2, 0.1],
|
||||
[5, 6],
|
||||
[8, 9],
|
||||
[6, 7],
|
||||
[0.2, 0.2],
|
||||
]
|
||||
)
|
||||
y = np.array([0, 0, 0, 1, 1, 1, 0])
|
||||
epsilon = 1e-5
|
||||
weights = [1, 1, 1, 0, 0, 0, 1]
|
||||
weights = np.array(weights, dtype="float64")
|
||||
weights_epsilon = [x + epsilon for x in weights]
|
||||
weights_no_zero = np.array([1, 1, 1, 0, 0, 2, 1])
|
||||
original = weights_no_zero.copy()
|
||||
clf = Stree()
|
||||
clf.fit(X, y)
|
||||
node = clf.train(X, y, weights, 1, "test",)
|
||||
# if a class is lost with zero weights the patch adds epsilon
|
||||
self.assertListEqual(weights.tolist(), weights_epsilon)
|
||||
self.assertListEqual(node._sample_weight.tolist(), weights_epsilon)
|
||||
# zero weights are ok when they don't erase a class
|
||||
_ = clf.train(X, y, weights_no_zero, 1, "test")
|
||||
self.assertListEqual(weights_no_zero.tolist(), original.tolist())
|
||||
|
||||
def test_build_predictor(self):
|
||||
X, y = load_dataset(self._random_state)
|
||||
clf = Stree(random_state=self._random_state)
|
||||
with self.assertRaises(ValueError):
|
||||
clf.tree_ = None
|
||||
clf._build_predictor()
|
||||
clf.fit(X, y)
|
||||
node = clf.tree_.get_down().get_down()
|
||||
expected_impurity = 0.04686951386893923
|
||||
expected_class = 1
|
||||
expected_belief = 0.9759887005649718
|
||||
self.assertAlmostEqual(expected_impurity, node._impurity)
|
||||
self.assertAlmostEqual(expected_belief, node._belief)
|
||||
self.assertEqual(expected_class, node._class)
|
||||
node._belief = 0.0
|
||||
node._class = None
|
||||
clf._build_predictor()
|
||||
node = clf.tree_.get_down().get_down()
|
||||
self.assertAlmostEqual(expected_belief, node._belief)
|
||||
self.assertEqual(expected_class, node._class)
|
||||
|
@@ -1,3 +1,4 @@
|
||||
# type: ignore
|
||||
from .Stree_test import Stree_test
|
||||
from .Snode_test import Snode_test
|
||||
from .Splitter_test import Splitter_test
|
||||
|
@@ -1,3 +1,4 @@
|
||||
# type: ignore
|
||||
from sklearn.datasets import make_classification
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user