from math import log, sqrt from sys import float_info from itertools import combinations import numpy as np class Metrics: @staticmethod def conditional_entropy(x, y, base=2): """quantifies the amount of information needed to describe the outcome of Y given that the value of X is known computes H(Y|X) Parameters ---------- x : np.array values of the variable y : np.array array of labels base : int, optional base of the logarithm, by default 2 Returns ------- float conditional entropy of y given x """ xy = np.c_[x, y] return Metrics.entropy(xy, base) - Metrics.entropy(x, base) @staticmethod def entropy(y, base=2): """measure of the uncertainty in predicting the value of y Parameters ---------- y : np.array array of labels base : int, optional base of the logarithm, by default 2 Returns ------- float entropy of y """ _, count = np.unique(y, return_counts=True, axis=0) proba = count.astype(float) / len(y) proba = proba[proba > 0.0] return np.sum(proba * np.log(1.0 / proba)) / log(base) @staticmethod def information_gain(x, y, base=2): """Measures the reduction in uncertainty about the value of y when the value of X is known (also called mutual information) (https://www.sciencedirect.com/science/article/pii/S0020025519303603) Parameters ---------- x : np.array values of the variable y : np.array array of labels base : int, optional base of the logarithm, by default 2 Returns ------- float Information gained """ return Metrics.entropy(y, base) - Metrics.conditional_entropy( x, y, base ) @staticmethod def symmetrical_uncertainty(x, y): """Compute symmetrical uncertainty. Normalize* information gain (mutual information) with the entropies of the features in order to compensate the bias due to high cardinality features. *Range [0, 1] (https://www.sciencedirect.com/science/article/pii/S0020025519303603) Parameters ---------- x : np.array values of the variable y : np.array array of labels Returns ------- float symmetrical uncertainty """ return ( 2.0 * Metrics.information_gain(x, y) / (Metrics.entropy(x) + Metrics.entropy(y)) ) class MFS: """Compute Fast Fast Correlation Based Filter Yu, L. and Liu, H.; Feature Selection for High-Dimensional Data: A Fast Correlation Based Filter Solution,Proc. 20th Intl. Conf. Mach. Learn. (ICML-2003) and Correlated Feature Selection as in "Correlation-based Feature Selection for Machine Learning" by Mark A. Hall Parameters ---------- max_features: int The maximum number of features to return """ def __init__(self, max_features): self._initialize() self._max_features = max_features def _initialize(self): """Initialize the attributes so support multiple calls using same object """ self._result = None self._scores = [] self._su_labels = None self._su_features = {} def _compute_su_labels(self): """Compute symmetrical uncertainty between each feature of the dataset and the labels and store it to use in future calls Returns ------- list vector with sym. un. of every feature and the labels """ if self._su_labels is None: num_features = self.X_.shape[1] self._su_labels = np.zeros(num_features) for col in range(num_features): self._su_labels[col] = Metrics.symmetrical_uncertainty( self.X_[:, col], self.y_ ) return self._su_labels def _compute_su_features(self, feature_a, feature_b): """Compute symmetrical uncertainty between two features and stores it to use in future calls Parameters ---------- feature_a : int index of the first feature feature_b : int index of the second feature Returns ------- float The symmetrical uncertainty of the two features """ if (feature_a, feature_b) not in self._su_features: self._su_features[ (feature_a, feature_b) ] = Metrics.symmetrical_uncertainty( self.X_[:, feature_a], self.X_[:, feature_b] ) return self._su_features[(feature_a, feature_b)] def _compute_merit(self, features): """Compute the merit function for cfs algorithms Parameters ---------- features : list list of features to include in the computation Returns ------- float The merit of the feature set passed """ # lgtm has already recognized that this is a false positive rcf = self._su_labels[ features # lgtm [py/hash-unhashable-value] ].sum() rff = 0.0 k = len(features) for pair in list(combinations(features, 2)): rff += self._compute_su_features(*pair) return rcf / sqrt(k + (k ** 2 - k) * rff) def cfs(self, X, y): """Correlation-based Feature Selection with a forward best first heuristic search Parameters ---------- X : np.array array of features y : np.array vector of labels Returns ------- self self """ self._initialize() self.X_ = X self.y_ = y s_list = self._compute_su_labels() # Descending order feature_order = (-s_list).argsort().tolist() continue_condition = True candidates = [] # start with the best feature (max symmetrical uncertainty wrt label) first_candidate = feature_order.pop(0) candidates.append(first_candidate) self._scores.append(s_list[first_candidate]) while continue_condition: merit = float_info.min id_selected = None for idx, feature in enumerate(feature_order): candidates.append(feature) merit_new = self._compute_merit(candidates) if merit_new > merit: id_selected = idx merit = merit_new candidates.pop() candidates.append(feature_order[id_selected]) self._scores.append(merit) del feature_order[id_selected] if ( len(feature_order) == 0 or len(candidates) == self._max_features ): # Force leaving the loop continue_condition = False if len(self._scores) >= 5: """ "To prevent the best first search from exploring the entire feature subset search space, a stopping criterion is imposed. The search will terminate if five consecutive fully expanded subsets show no improvement over the current best subset." as stated in Mark A. Hall Thesis """ item_ant = -1 for item in self._scores[-5:]: if item_ant == -1: item_ant = item if item > item_ant: break else: item_ant = item else: continue_condition = False self._result = candidates return self def fcbf(self, X, y, threshold): """Fast Correlation-Based Filter Parameters ---------- X : np.array array of features y : np.array vector of labels threshold : float threshold to select relevant features Returns ------- self self Raises ------ ValueError if the threshold is less than a selected value of 1e-7 """ if threshold < 1e-7: raise ValueError("Threshold cannot be less than 1e-7") self._initialize() self.X_ = X self.y_ = y s_list = self._compute_su_labels() feature_order = (-s_list).argsort() feature_dup = feature_order.copy().tolist() self._result = [] for index_p in feature_order: # Don't self compare feature_dup.pop(0) # Remove redundant features if s_list[index_p] == 0.0: # the feature has been removed from the list continue if s_list[index_p] < threshold: break # Remove redundant features for index_q in feature_dup: su_pq = self._compute_su_features(index_p, index_q) if su_pq >= s_list[index_q]: # remove feature from list s_list[index_q] = 0.0 self._result.append(index_p) self._scores.append(s_list[index_p]) if len(self._result) == self._max_features: break return self def get_results(self): """Return the results of the algorithm applied if any Returns ------- list list of features indices selected """ return self._result def get_scores(self): """Return the scores computed for the features selected Returns ------- list list of scores of the features selected """ return self._scores