Refactor predict and score and make mypy --strict

This commit is contained in:
2020-07-01 18:37:10 +02:00
parent fa001f97a4
commit d1e30a3372
6 changed files with 106 additions and 128 deletions

View File

@@ -6,25 +6,24 @@ __version__ = "0.9"
Build an oblique tree classifier based on SVM Trees
"""
from __future__ import annotations
import os
import random
import warnings
from typing import Optional, List, Union, Tuple
from math import log
from itertools import combinations
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.svm import SVC, LinearSVC
from sklearn.utils import check_consistent_length
from sklearn.utils.multiclass import check_classification_targets
from sklearn.exceptions import ConvergenceWarning
from sklearn.utils.validation import (
check_X_y,
check_array,
import numpy as np # type: ignore
from sklearn.base import BaseEstimator, ClassifierMixin # type: ignore
from sklearn.svm import SVC, LinearSVC # type: ignore
from sklearn.utils.multiclass import ( # type: ignore
check_classification_targets,
)
from sklearn.exceptions import ConvergenceWarning # type: ignore
from sklearn.utils.validation import ( # type: ignore
check_is_fitted,
_check_sample_weight,
)
from sklearn.metrics._classification import _weighted_sum, _check_targets
class Snode:
@@ -34,7 +33,7 @@ class Snode:
def __init__(
self,
clf: SVC,
clf: Union[SVC, LinearSVC],
X: np.ndarray,
y: np.ndarray,
features: np.array,
@@ -42,24 +41,25 @@ class Snode:
title: str,
weight: np.ndarray = None,
):
self._clf = clf
self._title = title
self._belief = 0.0
self._clf: Union[SVC, LinearSVC] = clf
self._title: str = title
self._belief: float = 0.0
# Only store dataset in Testing
self._X = X if os.environ.get("TESTING", "NS") != "NS" else None
self._y = y
self._down = None
self._up = None
self._X: Optional[np.array] = X if os.environ.get(
"TESTING", "NS"
) != "NS" else None
self._y: np.array = y
self._down: Optional[Snode] = None
self._up: Optional[Snode] = None
self._class = None
self._feature = None
self._sample_weight = (
self._sample_weight: Optional[np.array] = (
weight if os.environ.get("TESTING", "NS") != "NS" else None
)
self._features = features
self._impurity = impurity
self._features: Tuple[int, ...] = features
self._impurity: float = impurity
@classmethod
def copy(cls, node: "Snode") -> "Snode":
def copy(cls, node: Snode) -> Snode:
return cls(
node._clf,
node._X,
@@ -69,22 +69,22 @@ class Snode:
node._title,
)
def set_down(self, son):
def set_down(self, son: Snode) -> None:
self._down = son
def set_up(self, son):
def set_up(self, son: Snode) -> None:
self._up = son
def is_leaf(self) -> bool:
return self._up is None and self._down is None
def get_down(self) -> Optional["Snode"]:
def get_down(self) -> Optional[Snode]:
return self._down
def get_up(self) -> Optional["Snode"]:
def get_up(self) -> Optional[Snode]:
return self._up
def make_predictor(self):
def make_predictor(self) -> None:
"""Compute the class of the predictor and its belief based on the
subdataset of the node only if it is a leaf
"""
@@ -143,21 +143,21 @@ class Siterator:
class Splitter:
def __init__(
self,
clf: SVC = None,
criterion: str = None,
splitter_type: str = None,
criteria: str = None,
min_samples_split: int = None,
random_state=None,
clf: Union[SVC, LinearSVC] = None,
criterion: str = "",
splitter_type: str = "",
criteria: str = "",
min_samples_split: int = 0,
random_state: Optional[int] = None,
):
self._clf: Union[SVC, LinearSVC] = clf
self._random_state = random_state
self._random_state: Optional[int] = random_state
if random_state is not None:
random.seed(random_state)
self._criterion = criterion
self._min_samples_split = min_samples_split
self._criteria = criteria
self._splitter_type = splitter_type
self._criterion: str = criterion
self._min_samples_split: int = min_samples_split
self._criteria: str = criteria
self._splitter_type: str = splitter_type
if clf is None:
raise ValueError(f"clf has to be a sklearn estimator, got({clf})")
@@ -186,7 +186,7 @@ class Splitter:
@staticmethod
def _gini(y: np.array) -> float:
_, count = np.unique(y, return_counts=True)
return 1 - np.sum(np.square(count / np.sum(count)))
return float(1 - np.sum(np.square(count / np.sum(count))))
@staticmethod
def _entropy(y: np.array) -> float:
@@ -220,7 +220,7 @@ class Splitter:
if samples == 0:
return 0.0
else:
result = (
result = float(
imp_prev
- (card_up / samples) * imp_up
- (card_dn / samples) * imp_dn
@@ -228,10 +228,13 @@ class Splitter:
return result
def _select_best_set(
self, dataset: np.array, labels: np.array, features_sets: list
) -> list:
self,
dataset: np.array,
labels: np.array,
features_sets: List[Tuple[int, ...]],
) -> Tuple[int, ...]:
max_gain: float = 0.0
selected: Union[List[int], None] = None
selected: Union[Tuple[int, ...], None] = None
warnings.filterwarnings("ignore", category=ConvergenceWarning)
for feature_set in features_sets:
self._clf.fit(dataset[:, feature_set], labels)
@@ -272,7 +275,7 @@ class Splitter:
return dataset[:, indices], indices
@staticmethod
def _min_distance(data: np.array, _) -> np.array:
def _min_distance(data: np.array, _: np.array) -> np.array:
"""Assign class to min distances
return a vector of classes so partition can separate class 0 from
@@ -288,7 +291,7 @@ class Splitter:
return np.argmin(data, axis=1)
@staticmethod
def _max_distance(data: np.array, _) -> np.array:
def _max_distance(data: np.array, _: np.array) -> np.array:
"""Assign class to max distances
return a vector of classes so partition can separate class 0 from
@@ -320,7 +323,7 @@ class Splitter:
selected = np.argmax(samples)
return data[:, selected]
def partition(self, samples: np.array, node: Snode):
def partition(self, samples: np.array, node: Snode) -> None:
"""Set the criteria to split arrays. Compute the indices of the samples
that should go to one side of the tree (down)
@@ -348,7 +351,7 @@ class Splitter:
"""
return node._clf.decision_function(data[:, node._features])
def part(self, origin: np.array) -> list:
def part(self, origin: np.array) -> Tuple[np.array, np.array]:
"""Split an array in two based on indices (down) and its complement
:param origin: dataset to split
@@ -359,13 +362,13 @@ class Splitter:
:rtype: list
"""
up = ~self._down
return [
return (
origin[up] if any(up) else None,
origin[self._down] if any(self._down) else None,
]
)
class Stree(BaseEstimator, ClassifierMixin):
class Stree(BaseEstimator, ClassifierMixin): # type: ignore
"""Estimator that is based on binary trees of svm nodes
can deal with sample_weights in predict, used in boosting sklearn methods
inheriting from BaseEstimator implements get_params and set_params methods
@@ -378,42 +381,34 @@ class Stree(BaseEstimator, ClassifierMixin):
C: float = 1.0,
kernel: str = "linear",
max_iter: int = 1000,
random_state: int = None,
max_depth: int = None,
random_state: Optional[int] = None,
max_depth: Optional[int] = None,
tol: float = 1e-4,
degree: int = 3,
gamma="scale",
gamma: Union[float, str] = "scale",
split_criteria: str = "max_samples",
criterion: str = "gini",
min_samples_split: int = 0,
max_features=None,
max_features: Optional[Union[str, int, float]] = None,
splitter: str = "random",
):
self.max_iter = max_iter
self.C = C
self.kernel = kernel
self.random_state = random_state
self.max_depth = max_depth
self.tol = tol
self.gamma = gamma
self.degree = degree
self.min_samples_split = min_samples_split
self.split_criteria = split_criteria
self.max_features = max_features
self.criterion = criterion
self.splitter = splitter
def _more_tags(self) -> dict:
"""Required by sklearn to supply features of the classifier
:return: the tag required
:rtype: dict
"""
return {"requires_y": True}
self.C: float = C
self.kernel: str = kernel
self.random_state: Optional[int] = random_state
self.max_depth: Optional[int] = max_depth
self.tol: float = tol
self.gamma: Union[float, str] = gamma
self.degree: int = degree
self.min_samples_split: int = min_samples_split
self.split_criteria: str = split_criteria
self.max_features: Union[str, int, float, None] = max_features
self.criterion: str = criterion
self.splitter: str = splitter
def fit(
self, X: np.ndarray, y: np.ndarray, sample_weight: np.array = None
) -> "Stree":
) -> Stree:
"""Build the tree based on the dataset of samples and its labels
:param X: dataset of samples to make predictions
@@ -442,13 +437,11 @@ class Stree(BaseEstimator, ClassifierMixin):
f"Maximum depth has to be greater than 1... got (max_depth=\
{self.max_depth})"
)
check_classification_targets(y)
X, y = check_X_y(X, y)
X, y = self._validate_data(X, y)
sample_weight = _check_sample_weight(
sample_weight, X, dtype=np.float64
)
check_classification_targets(y)
# Initialize computed parameters
self.splitter_ = Splitter(
clf=self._build_clf(),
@@ -464,8 +457,6 @@ class Stree(BaseEstimator, ClassifierMixin):
self.n_classes_ = self.classes_.shape[0]
self.n_iter_ = self.max_iter
self.depth_ = 0
self.n_features_ = X.shape[1]
self.n_features_in_ = X.shape[1]
self.max_features_ = self._initialize_max_features()
self.tree_ = self.train(X, y, sample_weight, 1, "root")
self._build_predictor()
@@ -539,8 +530,16 @@ class Stree(BaseEstimator, ClassifierMixin):
title=title + ", <cgaf>",
weight=sample_weight,
)
node.set_up(self.train(X_U, y_u, sw_u, depth + 1, title + " - Up"))
node.set_down(self.train(X_D, y_d, sw_d, depth + 1, title + " - Down"))
node.set_up(
self.train( # type: ignore
X_U, y_u, sw_u, depth + 1, title + " - Up"
)
)
node.set_down(
self.train( # type: ignore
X_D, y_d, sw_d, depth + 1, title + " - Down"
)
)
return node
def _build_predictor(self) -> None:
@@ -611,26 +610,26 @@ class Stree(BaseEstimator, ClassifierMixin):
) -> np.array:
if xp is None:
return [], []
if node.is_leaf():
if node.is_leaf(): # type: ignore
# set a class for every sample in dataset
prediction = np.full((xp.shape[0], 1), node._class)
prediction = np.full(
(xp.shape[0], 1), node._class # type: ignore
)
return prediction, indices
self.splitter_.partition(xp, node)
self.splitter_.partition(xp, node) # type: ignore
x_u, x_d = self.splitter_.part(xp)
i_u, i_d = self.splitter_.part(indices)
prx_u, prin_u = predict_class(x_u, i_u, node.get_up())
prx_d, prin_d = predict_class(x_d, i_d, node.get_down())
prx_u, prin_u = predict_class(
x_u, i_u, node.get_up() # type: ignore
)
prx_d, prin_d = predict_class(
x_d, i_d, node.get_down() # type: ignore
)
return np.append(prx_u, prx_d), np.append(prin_u, prin_d)
# sklearn check
check_is_fitted(self, ["tree_"])
check_is_fitted(self, "n_features_in_")
# Input validation
X = check_array(X)
if X.shape[1] != self.n_features_:
raise ValueError(
f"Expected {self.n_features_} features but got "
f"({X.shape[1]})"
)
X = self._validate_data(X, reset=False)
# setup prediction & make it happen
indices = np.arange(X.shape[0])
result = (
@@ -640,32 +639,6 @@ class Stree(BaseEstimator, ClassifierMixin):
)
return self.classes_[result]
def score(
self, X: np.array, y: np.array, sample_weight: np.array = None
) -> float:
"""Compute accuracy of the prediction
:param X: dataset of samples to make predictions
:type X: np.array
:param y_true: samples labels
:type y_true: np.array
:param sample_weight: weights of the samples. Rescale C per sample.
Hi' weights force the classifier to put more emphasis on these points
:type sample_weight: np.array optional
:return: accuracy of the prediction
:rtype: float
"""
# sklearn check
check_is_fitted(self)
check_classification_targets(y)
X, y = check_X_y(X, y)
y_pred = self.predict(X).reshape(y.shape)
# Compute accuracy for each possible representation
_, y_true, y_pred = _check_targets(y, y_pred)
check_consistent_length(y_true, y_pred, sample_weight)
score = y_true == y_pred
return _weighted_sum(score, sample_weight, normalize=True)
def __iter__(self) -> Siterator:
"""Create an iterator to be able to visit the nodes of the tree in
preorder, can make a list with all the nodes in preorder
@@ -693,11 +666,11 @@ class Stree(BaseEstimator, ClassifierMixin):
def _initialize_max_features(self) -> int:
if isinstance(self.max_features, str):
if self.max_features == "auto":
max_features = max(1, int(np.sqrt(self.n_features_)))
max_features = max(1, int(np.sqrt(self.n_features_in_)))
elif self.max_features == "sqrt":
max_features = max(1, int(np.sqrt(self.n_features_)))
max_features = max(1, int(np.sqrt(self.n_features_in_)))
elif self.max_features == "log2":
max_features = max(1, int(np.log2(self.n_features_)))
max_features = max(1, int(np.log2(self.n_features_in_)))
else:
raise ValueError(
"Invalid value for max_features. "
@@ -705,13 +678,13 @@ class Stree(BaseEstimator, ClassifierMixin):
"'sqrt' or 'log2'."
)
elif self.max_features is None:
max_features = self.n_features_
max_features = self.n_features_in_
elif isinstance(self.max_features, int):
max_features = self.max_features
else: # float
if self.max_features > 0.0:
max_features = max(
1, int(self.max_features * self.n_features_)
1, int(self.max_features * self.n_features_in_)
)
else:
raise ValueError(

View File

@@ -1,3 +1,4 @@
# type: ignore
import os
import unittest

View File

@@ -1,3 +1,4 @@
# type: ignore
import os
import unittest
import random

View File

@@ -1,3 +1,4 @@
# type: ignore
import os
import unittest
import warnings
@@ -239,7 +240,7 @@ class Stree_test(unittest.TestCase):
(None, 16),
]
clf = Stree()
clf.n_features_ = n_features
clf.n_features_in_ = n_features
for max_features, expected in expected_values:
clf.set_params(**dict(max_features=max_features))
computed = clf._initialize_max_features()

View File

@@ -1,3 +1,4 @@
# type: ignore
from .Stree_test import Stree_test
from .Snode_test import Snode_test
from .Splitter_test import Splitter_test

View File

@@ -1,3 +1,4 @@
# type: ignore
from sklearn.datasets import make_classification