diff --git a/stree/Strees.py b/stree/Strees.py index 5464768..21a05db 100644 --- a/stree/Strees.py +++ b/stree/Strees.py @@ -6,25 +6,24 @@ __version__ = "0.9" Build an oblique tree classifier based on SVM Trees """ +from __future__ import annotations import os import random import warnings from typing import Optional, List, Union, Tuple from math import log from itertools import combinations -import numpy as np -from sklearn.base import BaseEstimator, ClassifierMixin -from sklearn.svm import SVC, LinearSVC -from sklearn.utils import check_consistent_length -from sklearn.utils.multiclass import check_classification_targets -from sklearn.exceptions import ConvergenceWarning -from sklearn.utils.validation import ( - check_X_y, - check_array, +import numpy as np # type: ignore +from sklearn.base import BaseEstimator, ClassifierMixin # type: ignore +from sklearn.svm import SVC, LinearSVC # type: ignore +from sklearn.utils.multiclass import ( # type: ignore + check_classification_targets, +) +from sklearn.exceptions import ConvergenceWarning # type: ignore +from sklearn.utils.validation import ( # type: ignore check_is_fitted, _check_sample_weight, ) -from sklearn.metrics._classification import _weighted_sum, _check_targets class Snode: @@ -34,7 +33,7 @@ class Snode: def __init__( self, - clf: SVC, + clf: Union[SVC, LinearSVC], X: np.ndarray, y: np.ndarray, features: np.array, @@ -42,24 +41,25 @@ class Snode: title: str, weight: np.ndarray = None, ): - self._clf = clf - self._title = title - self._belief = 0.0 + self._clf: Union[SVC, LinearSVC] = clf + self._title: str = title + self._belief: float = 0.0 # Only store dataset in Testing - self._X = X if os.environ.get("TESTING", "NS") != "NS" else None - self._y = y - self._down = None - self._up = None + self._X: Optional[np.array] = X if os.environ.get( + "TESTING", "NS" + ) != "NS" else None + self._y: np.array = y + self._down: Optional[Snode] = None + self._up: Optional[Snode] = None self._class = None - self._feature = None - self._sample_weight = ( + self._sample_weight: Optional[np.array] = ( weight if os.environ.get("TESTING", "NS") != "NS" else None ) - self._features = features - self._impurity = impurity + self._features: Tuple[int, ...] = features + self._impurity: float = impurity @classmethod - def copy(cls, node: "Snode") -> "Snode": + def copy(cls, node: Snode) -> Snode: return cls( node._clf, node._X, @@ -69,22 +69,22 @@ class Snode: node._title, ) - def set_down(self, son): + def set_down(self, son: Snode) -> None: self._down = son - def set_up(self, son): + def set_up(self, son: Snode) -> None: self._up = son def is_leaf(self) -> bool: return self._up is None and self._down is None - def get_down(self) -> Optional["Snode"]: + def get_down(self) -> Optional[Snode]: return self._down - def get_up(self) -> Optional["Snode"]: + def get_up(self) -> Optional[Snode]: return self._up - def make_predictor(self): + def make_predictor(self) -> None: """Compute the class of the predictor and its belief based on the subdataset of the node only if it is a leaf """ @@ -143,21 +143,21 @@ class Siterator: class Splitter: def __init__( self, - clf: SVC = None, - criterion: str = None, - splitter_type: str = None, - criteria: str = None, - min_samples_split: int = None, - random_state=None, + clf: Union[SVC, LinearSVC] = None, + criterion: str = "", + splitter_type: str = "", + criteria: str = "", + min_samples_split: int = 0, + random_state: Optional[int] = None, ): self._clf: Union[SVC, LinearSVC] = clf - self._random_state = random_state + self._random_state: Optional[int] = random_state if random_state is not None: random.seed(random_state) - self._criterion = criterion - self._min_samples_split = min_samples_split - self._criteria = criteria - self._splitter_type = splitter_type + self._criterion: str = criterion + self._min_samples_split: int = min_samples_split + self._criteria: str = criteria + self._splitter_type: str = splitter_type if clf is None: raise ValueError(f"clf has to be a sklearn estimator, got({clf})") @@ -186,7 +186,7 @@ class Splitter: @staticmethod def _gini(y: np.array) -> float: _, count = np.unique(y, return_counts=True) - return 1 - np.sum(np.square(count / np.sum(count))) + return float(1 - np.sum(np.square(count / np.sum(count)))) @staticmethod def _entropy(y: np.array) -> float: @@ -220,7 +220,7 @@ class Splitter: if samples == 0: return 0.0 else: - result = ( + result = float( imp_prev - (card_up / samples) * imp_up - (card_dn / samples) * imp_dn @@ -228,10 +228,13 @@ class Splitter: return result def _select_best_set( - self, dataset: np.array, labels: np.array, features_sets: list - ) -> list: + self, + dataset: np.array, + labels: np.array, + features_sets: List[Tuple[int, ...]], + ) -> Tuple[int, ...]: max_gain: float = 0.0 - selected: Union[List[int], None] = None + selected: Union[Tuple[int, ...], None] = None warnings.filterwarnings("ignore", category=ConvergenceWarning) for feature_set in features_sets: self._clf.fit(dataset[:, feature_set], labels) @@ -272,7 +275,7 @@ class Splitter: return dataset[:, indices], indices @staticmethod - def _min_distance(data: np.array, _) -> np.array: + def _min_distance(data: np.array, _: np.array) -> np.array: """Assign class to min distances return a vector of classes so partition can separate class 0 from @@ -288,7 +291,7 @@ class Splitter: return np.argmin(data, axis=1) @staticmethod - def _max_distance(data: np.array, _) -> np.array: + def _max_distance(data: np.array, _: np.array) -> np.array: """Assign class to max distances return a vector of classes so partition can separate class 0 from @@ -320,7 +323,7 @@ class Splitter: selected = np.argmax(samples) return data[:, selected] - def partition(self, samples: np.array, node: Snode): + def partition(self, samples: np.array, node: Snode) -> None: """Set the criteria to split arrays. Compute the indices of the samples that should go to one side of the tree (down) @@ -348,7 +351,7 @@ class Splitter: """ return node._clf.decision_function(data[:, node._features]) - def part(self, origin: np.array) -> list: + def part(self, origin: np.array) -> Tuple[np.array, np.array]: """Split an array in two based on indices (down) and its complement :param origin: dataset to split @@ -359,13 +362,13 @@ class Splitter: :rtype: list """ up = ~self._down - return [ + return ( origin[up] if any(up) else None, origin[self._down] if any(self._down) else None, - ] + ) -class Stree(BaseEstimator, ClassifierMixin): +class Stree(BaseEstimator, ClassifierMixin): # type: ignore """Estimator that is based on binary trees of svm nodes can deal with sample_weights in predict, used in boosting sklearn methods inheriting from BaseEstimator implements get_params and set_params methods @@ -378,42 +381,34 @@ class Stree(BaseEstimator, ClassifierMixin): C: float = 1.0, kernel: str = "linear", max_iter: int = 1000, - random_state: int = None, - max_depth: int = None, + random_state: Optional[int] = None, + max_depth: Optional[int] = None, tol: float = 1e-4, degree: int = 3, - gamma="scale", + gamma: Union[float, str] = "scale", split_criteria: str = "max_samples", criterion: str = "gini", min_samples_split: int = 0, - max_features=None, + max_features: Optional[Union[str, int, float]] = None, splitter: str = "random", ): self.max_iter = max_iter - self.C = C - self.kernel = kernel - self.random_state = random_state - self.max_depth = max_depth - self.tol = tol - self.gamma = gamma - self.degree = degree - self.min_samples_split = min_samples_split - self.split_criteria = split_criteria - self.max_features = max_features - self.criterion = criterion - self.splitter = splitter - - def _more_tags(self) -> dict: - """Required by sklearn to supply features of the classifier - - :return: the tag required - :rtype: dict - """ - return {"requires_y": True} + self.C: float = C + self.kernel: str = kernel + self.random_state: Optional[int] = random_state + self.max_depth: Optional[int] = max_depth + self.tol: float = tol + self.gamma: Union[float, str] = gamma + self.degree: int = degree + self.min_samples_split: int = min_samples_split + self.split_criteria: str = split_criteria + self.max_features: Union[str, int, float, None] = max_features + self.criterion: str = criterion + self.splitter: str = splitter def fit( self, X: np.ndarray, y: np.ndarray, sample_weight: np.array = None - ) -> "Stree": + ) -> Stree: """Build the tree based on the dataset of samples and its labels :param X: dataset of samples to make predictions @@ -442,13 +437,11 @@ class Stree(BaseEstimator, ClassifierMixin): f"Maximum depth has to be greater than 1... got (max_depth=\ {self.max_depth})" ) - check_classification_targets(y) - X, y = check_X_y(X, y) + X, y = self._validate_data(X, y) sample_weight = _check_sample_weight( sample_weight, X, dtype=np.float64 ) - check_classification_targets(y) # Initialize computed parameters self.splitter_ = Splitter( clf=self._build_clf(), @@ -464,8 +457,6 @@ class Stree(BaseEstimator, ClassifierMixin): self.n_classes_ = self.classes_.shape[0] self.n_iter_ = self.max_iter self.depth_ = 0 - self.n_features_ = X.shape[1] - self.n_features_in_ = X.shape[1] self.max_features_ = self._initialize_max_features() self.tree_ = self.train(X, y, sample_weight, 1, "root") self._build_predictor() @@ -539,8 +530,16 @@ class Stree(BaseEstimator, ClassifierMixin): title=title + ", ", weight=sample_weight, ) - node.set_up(self.train(X_U, y_u, sw_u, depth + 1, title + " - Up")) - node.set_down(self.train(X_D, y_d, sw_d, depth + 1, title + " - Down")) + node.set_up( + self.train( # type: ignore + X_U, y_u, sw_u, depth + 1, title + " - Up" + ) + ) + node.set_down( + self.train( # type: ignore + X_D, y_d, sw_d, depth + 1, title + " - Down" + ) + ) return node def _build_predictor(self) -> None: @@ -611,26 +610,26 @@ class Stree(BaseEstimator, ClassifierMixin): ) -> np.array: if xp is None: return [], [] - if node.is_leaf(): + if node.is_leaf(): # type: ignore # set a class for every sample in dataset - prediction = np.full((xp.shape[0], 1), node._class) + prediction = np.full( + (xp.shape[0], 1), node._class # type: ignore + ) return prediction, indices - self.splitter_.partition(xp, node) + self.splitter_.partition(xp, node) # type: ignore x_u, x_d = self.splitter_.part(xp) i_u, i_d = self.splitter_.part(indices) - prx_u, prin_u = predict_class(x_u, i_u, node.get_up()) - prx_d, prin_d = predict_class(x_d, i_d, node.get_down()) + prx_u, prin_u = predict_class( + x_u, i_u, node.get_up() # type: ignore + ) + prx_d, prin_d = predict_class( + x_d, i_d, node.get_down() # type: ignore + ) return np.append(prx_u, prx_d), np.append(prin_u, prin_d) - # sklearn check - check_is_fitted(self, ["tree_"]) + check_is_fitted(self, "n_features_in_") # Input validation - X = check_array(X) - if X.shape[1] != self.n_features_: - raise ValueError( - f"Expected {self.n_features_} features but got " - f"({X.shape[1]})" - ) + X = self._validate_data(X, reset=False) # setup prediction & make it happen indices = np.arange(X.shape[0]) result = ( @@ -640,32 +639,6 @@ class Stree(BaseEstimator, ClassifierMixin): ) return self.classes_[result] - def score( - self, X: np.array, y: np.array, sample_weight: np.array = None - ) -> float: - """Compute accuracy of the prediction - - :param X: dataset of samples to make predictions - :type X: np.array - :param y_true: samples labels - :type y_true: np.array - :param sample_weight: weights of the samples. Rescale C per sample. - Hi' weights force the classifier to put more emphasis on these points - :type sample_weight: np.array optional - :return: accuracy of the prediction - :rtype: float - """ - # sklearn check - check_is_fitted(self) - check_classification_targets(y) - X, y = check_X_y(X, y) - y_pred = self.predict(X).reshape(y.shape) - # Compute accuracy for each possible representation - _, y_true, y_pred = _check_targets(y, y_pred) - check_consistent_length(y_true, y_pred, sample_weight) - score = y_true == y_pred - return _weighted_sum(score, sample_weight, normalize=True) - def __iter__(self) -> Siterator: """Create an iterator to be able to visit the nodes of the tree in preorder, can make a list with all the nodes in preorder @@ -693,11 +666,11 @@ class Stree(BaseEstimator, ClassifierMixin): def _initialize_max_features(self) -> int: if isinstance(self.max_features, str): if self.max_features == "auto": - max_features = max(1, int(np.sqrt(self.n_features_))) + max_features = max(1, int(np.sqrt(self.n_features_in_))) elif self.max_features == "sqrt": - max_features = max(1, int(np.sqrt(self.n_features_))) + max_features = max(1, int(np.sqrt(self.n_features_in_))) elif self.max_features == "log2": - max_features = max(1, int(np.log2(self.n_features_))) + max_features = max(1, int(np.log2(self.n_features_in_))) else: raise ValueError( "Invalid value for max_features. " @@ -705,13 +678,13 @@ class Stree(BaseEstimator, ClassifierMixin): "'sqrt' or 'log2'." ) elif self.max_features is None: - max_features = self.n_features_ + max_features = self.n_features_in_ elif isinstance(self.max_features, int): max_features = self.max_features else: # float if self.max_features > 0.0: max_features = max( - 1, int(self.max_features * self.n_features_) + 1, int(self.max_features * self.n_features_in_) ) else: raise ValueError( diff --git a/stree/tests/Snode_test.py b/stree/tests/Snode_test.py index 27e5d0a..0e096c1 100644 --- a/stree/tests/Snode_test.py +++ b/stree/tests/Snode_test.py @@ -1,3 +1,4 @@ +# type: ignore import os import unittest diff --git a/stree/tests/Splitter_test.py b/stree/tests/Splitter_test.py index 8417779..be04d71 100644 --- a/stree/tests/Splitter_test.py +++ b/stree/tests/Splitter_test.py @@ -1,3 +1,4 @@ +# type: ignore import os import unittest import random diff --git a/stree/tests/Stree_test.py b/stree/tests/Stree_test.py index e4715a6..37ade05 100644 --- a/stree/tests/Stree_test.py +++ b/stree/tests/Stree_test.py @@ -1,3 +1,4 @@ +# type: ignore import os import unittest import warnings @@ -239,7 +240,7 @@ class Stree_test(unittest.TestCase): (None, 16), ] clf = Stree() - clf.n_features_ = n_features + clf.n_features_in_ = n_features for max_features, expected in expected_values: clf.set_params(**dict(max_features=max_features)) computed = clf._initialize_max_features() diff --git a/stree/tests/__init__.py b/stree/tests/__init__.py index 32e7a88..8ead54d 100644 --- a/stree/tests/__init__.py +++ b/stree/tests/__init__.py @@ -1,3 +1,4 @@ +# type: ignore from .Stree_test import Stree_test from .Snode_test import Snode_test from .Splitter_test import Splitter_test diff --git a/stree/tests/utils.py b/stree/tests/utils.py index 94b0506..10fd6d1 100644 --- a/stree/tests/utils.py +++ b/stree/tests/utils.py @@ -1,3 +1,4 @@ +# type: ignore from sklearn.datasets import make_classification