""" __author__ = "Ricardo Montañana Gómez" __copyright__ = "Copyright 2020, Ricardo Montañana Gómez" __license__ = "MIT" __version__ = "0.9" Build an oblique tree classifier based on SVM nodes """ import os import numbers import random import warnings from math import log, factorial from typing import Optional import numpy as np from sklearn.base import BaseEstimator, ClassifierMixin from sklearn.svm import SVC, LinearSVC from sklearn.utils import check_consistent_length from sklearn.utils.multiclass import check_classification_targets from sklearn.exceptions import ConvergenceWarning from sklearn.utils.validation import ( check_X_y, check_array, check_is_fitted, _check_sample_weight, ) from sklearn.metrics._classification import _weighted_sum, _check_targets class Snode: """Nodes of the tree that keeps the svm classifier and if testing the dataset assigned to it """ def __init__( self, clf: SVC, X: np.ndarray, y: np.ndarray, features: np.array, impurity: float, title: str, weight: np.ndarray = None, ): self._clf = clf self._title = title self._belief = 0.0 # Only store dataset in Testing self._X = X if os.environ.get("TESTING", "NS") != "NS" else None self._y = y self._down = None self._up = None self._class = None self._feature = None self._sample_weight = ( weight if os.environ.get("TESTING", "NS") != "NS" else None ) self._features = features self._impurity = impurity self._partition_column: int = -1 @classmethod def copy(cls, node: "Snode") -> "Snode": return cls( node._clf, node._X, node._y, node._features, node._impurity, node._title, ) def set_partition_column(self, col: int): self._partition_column = col def get_partition_column(self) -> int: return self._partition_column def set_down(self, son): self._down = son def set_title(self, title): self._title = title def set_classifier(self, clf): self._clf = clf def set_features(self, features): self._features = features def set_impurity(self, impurity): self._impurity = impurity def get_title(self) -> str: return self._title def get_classifier(self) -> SVC: return self._clf def get_impurity(self) -> float: return self._impurity def get_features(self) -> np.array: return self._features def set_up(self, son): self._up = son def is_leaf(self) -> bool: return self._up is None and self._down is None def get_down(self) -> "Snode": return self._down def get_up(self) -> "Snode": return self._up def make_predictor(self): """Compute the class of the predictor and its belief based on the subdataset of the node only if it is a leaf """ if not self.is_leaf(): return classes, card = np.unique(self._y, return_counts=True) if len(classes) > 1: max_card = max(card) self._class = classes[card == max_card][0] self._belief = max_card / np.sum(card) else: self._belief = 1 try: self._class = classes[0] except IndexError: self._class = None def __str__(self) -> str: count_values = np.unique(self._y, return_counts=True) if self.is_leaf(): return ( f"{self._title} - Leaf class={self._class} belief=" f"{self._belief: .6f} impurity={self._impurity:.4f} " f"counts={count_values}" ) else: return ( f"{self._title} feaures={self._features} impurity=" f"{self._impurity:.4f} " f"counts={count_values}" ) class Siterator: """Stree preorder iterator""" def __init__(self, tree: Snode): self._stack = [] self._push(tree) def _push(self, node: Snode): if node is not None: self._stack.append(node) def __next__(self) -> Snode: if len(self._stack) == 0: raise StopIteration() node = self._stack.pop() self._push(node.get_up()) self._push(node.get_down()) return node class Splitter: def __init__( self, clf: SVC = None, criterion: str = None, splitter_type: str = None, criteria: str = None, min_samples_split: int = None, random_state=None, ): self._clf = clf self._random_state = random_state if random_state is not None: random.seed(random_state) self._criterion = criterion self._min_samples_split = min_samples_split self._criteria = criteria self._splitter_type = splitter_type if clf is None: raise ValueError(f"clf has to be a sklearn estimator, got({clf})") if criterion not in ["gini", "entropy"]: raise ValueError( f"criterion must be gini or entropy got({criterion})" ) if criteria not in [ "max_samples", "impurity", ]: raise ValueError( f"criteria has to be max_samples or impurity; got ({criteria})" ) if splitter_type not in ["random", "best"]: raise ValueError( f"splitter must be either random or best, got({splitter_type})" ) self.criterion_function = getattr(self, f"_{self._criterion}") self.decision_criteria = getattr(self, f"_{self._criteria}") def partition_impurity(self, y: np.array) -> np.array: return self.criterion_function(y) @staticmethod def _gini(y: np.array) -> float: _, count = np.unique(y, return_counts=True) return 1 - np.sum(np.square(count / np.sum(count))) @staticmethod def _entropy(y: np.array) -> float: """Compute entropy of a labels set Parameters ---------- y : np.array set of labels Returns ------- float entropy """ n_labels = len(y) if n_labels <= 1: return 0 counts = np.bincount(y) proportions = counts / n_labels n_classes = np.count_nonzero(proportions) if n_classes <= 1: return 0 entropy = 0.0 # Compute standard entropy. for prop in proportions: if prop != 0.0: entropy -= prop * log(prop, n_classes) return entropy def information_gain( self, labels: np.array, labels_up: np.array, labels_dn: np.array ) -> float: """Compute information gain of a split candidate Parameters ---------- labels : np.array labels of the dataset labels_up : np.array labels of one side labels_dn : np.array labels on the other side Returns ------- float information gain """ imp_prev = self.criterion_function(labels) card_up = card_dn = imp_up = imp_dn = 0 if labels_up is not None: card_up = labels_up.shape[0] imp_up = self.criterion_function(labels_up) if labels_dn is not None: card_dn = labels_dn.shape[0] if labels_dn is not None else 0 imp_dn = self.criterion_function(labels_dn) samples = card_up + card_dn if samples == 0: return 0.0 else: result = ( imp_prev - (card_up / samples) * imp_up - (card_dn / samples) * imp_dn ) return result def _select_best_set( self, dataset: np.array, labels: np.array, features_sets: list ) -> list: max_gain = 0 selected = None warnings.filterwarnings("ignore", category=ConvergenceWarning) for feature_set in features_sets: self._clf.fit(dataset[:, feature_set], labels) node = Snode( self._clf, dataset, labels, feature_set, 0.0, "subset" ) self.partition(dataset, node, train=True) y1, y2 = self.part(labels) gain = self.information_gain(labels, y1, y2) if gain > max_gain: max_gain = gain selected = feature_set return selected if selected is not None else feature_set @staticmethod def _generate_spaces(features: int, max_features: int) -> list: """Generate at most 5 feature random combinations Parameters ---------- features : int number of features in each combination max_features : int number of features in dataset Returns ------- list list with up to 5 combination of features randomly selected """ comb = set() # Generate at most 5 combinations if max_features == features: set_length = 1 else: number = factorial(features) / ( factorial(max_features) * factorial(features - max_features) ) set_length = min(5, number) while len(comb) < set_length: comb.add( tuple(sorted(random.sample(range(features), max_features))) ) return list(comb) def _get_subspaces_set( self, dataset: np.array, labels: np.array, max_features: int ) -> np.array: """Compute the indices of the features selected by splitter depending on the self._splitter_type hyper parameter Parameters ---------- dataset : np.array array of samples labels : np.array labels of the dataset max_features : int number of features of the subspace (<= number of features in dataset) Returns ------- np.array indices of the features selected """ features_sets = self._generate_spaces(dataset.shape[1], max_features) if len(features_sets) > 1: if self._splitter_type == "random": index = random.randint(0, len(features_sets) - 1) return features_sets[index] else: return self._select_best_set(dataset, labels, features_sets) else: return features_sets[0] def get_subspace( self, dataset: np.array, labels: np.array, max_features: int ) -> tuple: """Return a subspace of the selected dataset of max_features length. Depending on hyperparmeter Parameters ---------- dataset : np.array array of samples (# samples, # features) labels : np.array labels of the dataset max_features : int number of features to form the subspace Returns ------- tuple tuple with the dataset with only the features selected and the indices of the features selected """ indices = self._get_subspaces_set(dataset, labels, max_features) return dataset[:, indices], indices def _impurity(self, data: np.array, y: np.array) -> np.array: """return column of dataset to be taken into account to split dataset Parameters ---------- data : np.array distances to hyper plane of every class y : np.array vector of labels (classes) Returns ------- np.array column of dataset to be taken into account to split dataset """ max_gain = 0 selected = -1 for col in range(data.shape[1]): tup = y[data[:, col] > 0] tdn = y[data[:, col] <= 0] info_gain = self.information_gain(y, tup, tdn) if info_gain > max_gain: selected = col max_gain = info_gain return selected @staticmethod def _max_samples(data: np.array, y: np.array) -> np.array: """return column of dataset to be taken into account to split dataset Parameters ---------- data : np.array distances to hyper plane of every class y : np.array column of dataset to be taken into account to split dataset Returns ------- np.array column of dataset to be taken into account to split dataset """ # select the class with max number of samples _, samples = np.unique(y, return_counts=True) return np.argmax(samples) def partition(self, samples: np.array, node: Snode, train: bool): """Set the criteria to split arrays. Compute the indices of the samples that should go to one side of the tree (up) """ # data contains the distances of every sample to every class hyperplane # array of (m, nc) nc = # classes data = self._distances(node, samples) if data.shape[0] < self._min_samples_split: # there aren't enough samples to split self._up = np.ones((data.shape[0]), dtype=bool) return if data.ndim > 1: # split criteria for multiclass # Convert data to a (m, 1) array selecting values for samples if train: # in train time we have to compute the column to take into # account to split the dataset col = self.decision_criteria(data, node._y) node.set_partition_column(col) else: # in predcit time just use the column computed in train time # is taking the classifier of class