diff --git a/mfs/k.py b/k.py similarity index 66% rename from mfs/k.py rename to k.py index 1dfce73..5ef993d 100644 --- a/mfs/k.py +++ b/k.py @@ -1,3 +1,4 @@ +import warnings from sklearn.datasets import load_wine from mfs import MFS from mfs.Metrics import Metrics @@ -7,11 +8,9 @@ mfsc = MFS(discrete=False) mfsd = MFS(discrete=True) X, y = load_wine(return_X_y=True) m, n = X.shape - print("* Differential entropy in X") for i in range(n): print(i, Metrics.differential_entropy(X[:, i], k=10)) - print("* Information Gain") print("- Discrete features") print(Metrics.information_gain(X, y)) @@ -21,19 +20,24 @@ print("- Continuous features") print(Metrics.information_gain_cont(X, y)) for i in range(n): print(i, Metrics.information_gain_cont(X[:, i], y)) +# Classification +warnings.filterwarnings("ignore") print("CFS Discrete") -print(mfsd.cfs(X, y).get_results()) +cfs_d = mfsd.cfs(X, y).get_results() +print(cfs_d) print("CFS continuous") cfs_f = mfsc.cfs(X, y).get_results() print(cfs_f) print("FCBF Discrete") -print(mfsd.fcbf(X, y, 1e-7).get_results()) +print(mfsd.fcbf(X, y, 5e-2).get_results()) print("FCBF continuous") -fcfb_f = mfsc.fcbf(X, y, 1e-7).get_results() -print(fcfb_f) +fcfb_f = mfsc.fcbf(X, y, 5e-2).get_results() +print(fcfb_f, len(fcfb_f), "X.shape=", X.shape) clf = Stree(random_state=0) print("completo", clf.fit(X, y).score(X, y)) clf = Stree(random_state=0) -print("cfs", clf.fit(X[:, cfs_f], y).score(X[:, cfs_f], y)) +print("cfs discreto", clf.fit(X[:, cfs_d], y).score(X[:, cfs_d], y)) +print("cfs continuo", clf.fit(X[:, cfs_f], y).score(X[:, cfs_f], y)) clf = Stree(random_state=0) -print("fcfb", clf.fit(X[:, fcfb_f], y).score(X[:, fcfb_f], y)) +subf = fcfb_f[:6] +print("fcfb", clf.fit(X[:, subf], y).score(X[:, subf], y)) diff --git a/mfs/Metrics.py b/mfs/Metrics.py index 37f17cd..c91f705 100755 --- a/mfs/Metrics.py +++ b/mfs/Metrics.py @@ -5,8 +5,6 @@ from scipy.special import digamma, gamma, psi from sklearn.neighbors import BallTree, KDTree from sklearn.neighbors import NearestNeighbors -# from sklearn.feature_selection._mutual_info import _compute_mi - class Metrics: @staticmethod @@ -27,13 +25,16 @@ class Metrics: float Information gained """ - # return _compute_mi( - # x, y, x_discrete=False, y_discrete=True, n_neighbors=3 - # ) return Metrics._compute_mi_cd(x, y, n_neighbors=3) + @staticmethod def _compute_mi_cd(c, d, n_neighbors): - """Compute mutual information between continuous and discrete variables. + """Compute mutual information between continuous and discrete + variable. + + # Author: Nikolay Mayorov + # License: 3-clause BSD + Parameters ---------- @@ -54,10 +55,10 @@ class Metrics: Notes ----- - True mutual information can't be negative. If its estimate by a numerical - method is negative, it means (providing the method is adequate) that the - mutual information is close to 0 and replacing it by 0 is a reasonable - strategy. + True mutual information can't be negative. If its estimate by a + numerical method is negative, it means (providing the method is + adequate) that the mutual information is close to 0 and replacing it + by 0 is a reasonable strategy. References ---------- @@ -67,7 +68,6 @@ class Metrics: n_samples = c.shape[0] if c.ndim == 1: c = c.reshape((-1, 1)) - radius = np.empty(n_samples) label_counts = np.empty(n_samples) k_all = np.empty(n_samples) @@ -83,7 +83,6 @@ class Metrics: radius[mask] = np.nextafter(r[:, -1], 0) k_all[mask] = k label_counts[mask] = count - # Ignore points with unique labels. mask = label_counts > 1 n_samples = np.sum(mask) @@ -91,8 +90,6 @@ class Metrics: k_all = k_all[mask] c = c[mask] radius = radius[mask] - - # kd = KDTree(c) kd = ( BallTree(c, metric="chebyshev") if n_samples >= 20 @@ -102,7 +99,6 @@ class Metrics: c, radius, count_only=True, return_distance=False ) m_all = np.array(m_all) - 1.0 - mi = ( digamma(n_samples) + np.mean(digamma(k_all)) @@ -126,7 +122,6 @@ class Metrics: @staticmethod def differential_entropy(x, k=1): - """Returns the entropy of the X. Parameters =========== @@ -167,31 +162,6 @@ class Metrics: - psi(k) ) - @staticmethod - def conditional_differential_entropy(x, y): - """quantifies the amount of information needed to describe the outcome - of Y discrete given that the value of X continuous is known - computes H(Y|X) - - Parameters - ---------- - x : np.array - values of the continuous variable - y : np.array - array of labels - base : int, optional - base of the logarithm, by default 2 - - Returns - ------- - float - conditional entropy of y given x - """ - xy = np.c_[x, y] - return Metrics.differential_entropy(xy) - Metrics.differential_entropy( - x - ) - @staticmethod def symmetrical_unc_continuous(x, y): """Compute symmetrical uncertainty. Using Greg Ver Steeg's npeet diff --git a/mfs/entropy_estimators.py b/mfs/entropy_estimators.py deleted file mode 100755 index e0c1630..0000000 --- a/mfs/entropy_estimators.py +++ /dev/null @@ -1,334 +0,0 @@ -#!/usr/bin/env python -# Written by Greg Ver Steeg -# See readme.pdf for documentation -# Or go to http://www.isi.edu/~gregv/npeet.html - -import warnings - -import numpy as np -import numpy.linalg as la -from numpy import log -from scipy.special import digamma -from sklearn.neighbors import BallTree, KDTree - -# CONTINUOUS ESTIMATORS - - -def entropy(x, k=3, base=2): - """The classic K-L k-nearest neighbor continuous entropy estimator - x should be a list of vectors, e.g. x = [[1.3], [3.7], [5.1], [2.4]] - if x is a one-dimensional scalar and we have four samples - """ - assert k <= len(x) - 1, "Set k smaller than num. samples - 1" - x = np.asarray(x) - n_elements, n_features = x.shape - x = add_noise(x) - tree = build_tree(x) - nn = query_neighbors(tree, x, k) - const = digamma(n_elements) - digamma(k) + n_features * log(2) - return (const + n_features * np.log(nn).mean()) / log(base) - - -def centropy(x, y, k=3, base=2): - """The classic K-L k-nearest neighbor continuous entropy estimator for the - entropy of X conditioned on Y. - """ - xy = np.c_[x, y] - entropy_union_xy = entropy(xy, k=k, base=base) - entropy_y = entropy(y, k=k, base=base) - return entropy_union_xy - entropy_y - - -def tc(xs, k=3, base=2): - xs_columns = np.expand_dims(xs, axis=0).T - entropy_features = [entropy(col, k=k, base=base) for col in xs_columns] - return np.sum(entropy_features) - entropy(xs, k, base) - - -def ctc(xs, y, k=3, base=2): - xs_columns = np.expand_dims(xs, axis=0).T - centropy_features = [ - centropy(col, y, k=k, base=base) for col in xs_columns - ] - return np.sum(centropy_features) - centropy(xs, y, k, base) - - -def corex(xs, ys, k=3, base=2): - xs_columns = np.expand_dims(xs, axis=0).T - cmi_features = [mi(col, ys, k=k, base=base) for col in xs_columns] - return np.sum(cmi_features) - mi(xs, ys, k=k, base=base) - - -def mi(x, y, z=None, k=3, base=2, alpha=0): - """Mutual information of x and y (conditioned on z if z is not None) - x, y should be a list of vectors, e.g. x = [[1.3], [3.7], [5.1], [2.4]] - if x is a one-dimensional scalar and we have four samples - """ - assert len(x) == len(y), "Arrays should have same length" - assert k <= len(x) - 1, "Set k smaller than num. samples - 1" - x, y = np.asarray(x), np.asarray(y) - x, y = x.reshape(x.shape[0], -1), y.reshape(y.shape[0], -1) - x = add_noise(x) - y = add_noise(y) - points = [x, y] - if z is not None: - z = np.asarray(z) - z = z.reshape(z.shape[0], -1) - points.append(z) - points = np.hstack(points) - # Find nearest neighbors in joint space, p=inf means max-norm - tree = build_tree(points) - dvec = query_neighbors(tree, points, k) - if z is None: - a, b, c, d = ( - avgdigamma(x, dvec), - avgdigamma(y, dvec), - digamma(k), - digamma(len(x)), - ) - if alpha > 0: - d += lnc_correction(tree, points, k, alpha) - else: - xz = np.c_[x, z] - yz = np.c_[y, z] - a, b, c, d = ( - avgdigamma(xz, dvec), - avgdigamma(yz, dvec), - avgdigamma(z, dvec), - digamma(k), - ) - return (-a - b + c + d) / log(base) - - -def cmi(x, y, z, k=3, base=2): - """Mutual information of x and y, conditioned on z - Legacy function. Use mi(x, y, z) directly. - """ - return mi(x, y, z=z, k=k, base=base) - - -def kldiv(x, xp, k=3, base=2): - """KL Divergence between p and q for x~p(x), xp~q(x) - x, xp should be a list of vectors, e.g. x = [[1.3], [3.7], [5.1],[2.4]] - if x is a one-dimensional scalar and we have four samples - """ - assert k < min(len(x), len(xp)), "Set k smaller than num. samples - 1" - assert len(x[0]) == len(xp[0]), "Two distributions must have same dim." - x, xp = np.asarray(x), np.asarray(xp) - x, xp = x.reshape(x.shape[0], -1), xp.reshape(xp.shape[0], -1) - d = len(x[0]) - n = len(x) - m = len(xp) - const = log(m) - log(n - 1) - tree = build_tree(x) - treep = build_tree(xp) - nn = query_neighbors(tree, x, k) - nnp = query_neighbors(treep, x, k - 1) - return (const + d * (np.log(nnp).mean() - np.log(nn).mean())) / log(base) - - -def lnc_correction(tree, points, k, alpha): - e = 0 - n_sample = points.shape[0] - for point in points: - # Find k-nearest neighbors in joint space, p=inf means max norm - knn = tree.query(point[None, :], k=k + 1, return_distance=False)[0] - knn_points = points[knn] - # Substract mean of k-nearest neighbor points - knn_points = knn_points - knn_points[0] - # Calculate covariance matrix of k-nearest neighbor points, obtain eigen vectors - covr = knn_points.T @ knn_points / k - _, v = la.eig(covr) - # Calculate PCA-bounding box using eigen vectors - V_rect = np.log(np.abs(knn_points @ v).max(axis=0)).sum() - # Calculate the volume of original box - log_knn_dist = np.log(np.abs(knn_points).max(axis=0)).sum() - - # Perform local non-uniformity checking and update correction term - if V_rect < log_knn_dist + np.log(alpha): - e += (log_knn_dist - V_rect) / n_sample - return e - - -# DISCRETE ESTIMATORS -def entropyd(sx, base=2): - """Discrete entropy estimator - sx is a list of samples - """ - unique, count = np.unique(sx, return_counts=True, axis=0) - # Convert to float as otherwise integer division results in all 0 for proba. - proba = count.astype(float) / len(sx) - # Avoid 0 division; remove probabilities == 0.0 (removing them does not change the entropy estimate as 0 * log(1/0) = 0. - proba = proba[proba > 0.0] - return np.sum(proba * np.log(1.0 / proba)) / log(base) - - -def midd(x, y, base=2): - """Discrete mutual information estimator - Given a list of samples which can be any hashable object - """ - assert len(x) == len(y), "Arrays should have same length" - return entropyd(x, base) - centropyd(x, y, base) - - -def cmidd(x, y, z, base=2): - """Discrete mutual information estimator - Given a list of samples which can be any hashable object - """ - assert len(x) == len(y) == len(z), "Arrays should have same length" - xz = np.c_[x, z] - yz = np.c_[y, z] - xyz = np.c_[x, y, z] - return ( - entropyd(xz, base) - + entropyd(yz, base) - - entropyd(xyz, base) - - entropyd(z, base) - ) - - -def centropyd(x, y, base=2): - """The classic K-L k-nearest neighbor continuous entropy estimator for the - entropy of X conditioned on Y. - """ - xy = np.c_[x, y] - return entropyd(xy, base) - entropyd(y, base) - - -def tcd(xs, base=2): - xs_columns = np.expand_dims(xs, axis=0).T - entropy_features = [entropyd(col, base=base) for col in xs_columns] - return np.sum(entropy_features) - entropyd(xs, base) - - -def ctcd(xs, y, base=2): - xs_columns = np.expand_dims(xs, axis=0).T - centropy_features = [centropyd(col, y, base=base) for col in xs_columns] - return np.sum(centropy_features) - centropyd(xs, y, base) - - -def corexd(xs, ys, base=2): - xs_columns = np.expand_dims(xs, axis=0).T - cmi_features = [midd(col, ys, base=base) for col in xs_columns] - return np.sum(cmi_features) - midd(xs, ys, base) - - -# MIXED ESTIMATORS -def micd(x, y, k=3, base=2, warning=True): - """If x is continuous and y is discrete, compute mutual information""" - assert len(x) == len(y), "Arrays should have same length" - entropy_x = entropy(x, k, base) - - y_unique, y_count = np.unique(y, return_counts=True, axis=0) - y_proba = y_count / len(y) - - entropy_x_given_y = 0.0 - for yval, py in zip(y_unique, y_proba): - x_given_y = x[(y == yval).all(axis=1)] - if k <= len(x_given_y) - 1: - entropy_x_given_y += py * entropy(x_given_y, k, base) - else: - if warning: - warnings.warn( - "Warning, after conditioning, on y={yval} insufficient data. " - "Assuming maximal entropy in this case.".format(yval=yval) - ) - entropy_x_given_y += py * entropy_x - return abs(entropy_x - entropy_x_given_y) # units already applied - - -def midc(x, y, k=3, base=2, warning=True): - return micd(y, x, k, base, warning) - - -def centropycd(x, y, k=3, base=2, warning=True): - return entropy(x, base) - micd(x, y, k, base, warning) - - -def centropydc(x, y, k=3, base=2, warning=True): - return centropycd(y, x, k=k, base=base, warning=warning) - - -def ctcdc(xs, y, k=3, base=2, warning=True): - xs_columns = np.expand_dims(xs, axis=0).T - centropy_features = [ - centropydc(col, y, k=k, base=base, warning=warning) - for col in xs_columns - ] - return np.sum(centropy_features) - centropydc(xs, y, k, base, warning) - - -def ctccd(xs, y, k=3, base=2, warning=True): - return ctcdc(y, xs, k=k, base=base, warning=warning) - - -def corexcd(xs, ys, k=3, base=2, warning=True): - return corexdc(ys, xs, k=k, base=base, warning=warning) - - -def corexdc(xs, ys, k=3, base=2, warning=True): - return tcd(xs, base) - ctcdc(xs, ys, k, base, warning) - - -# UTILITY FUNCTIONS - - -def add_noise(x, intens=1e-10): - # small noise to break degeneracy, see doc. - return x + intens * np.random.random_sample(x.shape) - - -def query_neighbors(tree, x, k): - return tree.query(x, k=k + 1)[0][:, k] - - -def count_neighbors(tree, x, r): - return tree.query_radius(x, r, count_only=True) - - -def avgdigamma(points, dvec): - # This part finds number of neighbors in some radius in the marginal space - # returns expectation value of - tree = build_tree(points) - dvec = dvec - 1e-15 - num_points = count_neighbors(tree, points, dvec) - return np.mean(digamma(num_points)) - - -def build_tree(points): - if points.shape[1] >= 20: - return BallTree(points, metric="chebyshev") - return KDTree(points, metric="chebyshev") - - -# TESTS - - -def shuffle_test(measure, x, y, z=False, ns=200, ci=0.95, **kwargs): - """Shuffle test - Repeatedly shuffle the x-values and then estimate measure(x, y, [z]). - Returns the mean and conf. interval ('ci=0.95' default) over 'ns' runs. - 'measure' could me mi, cmi, e.g. Keyword arguments can be passed. - Mutual information and CMI should have a mean near zero. - """ - x_clone = np.copy(x) # A copy that we can shuffle - outputs = [] - for _ in range(ns): - np.random.shuffle(x_clone) - if z: - outputs.append(measure(x_clone, y, z, **kwargs)) - else: - outputs.append(measure(x_clone, y, **kwargs)) - outputs.sort() - return np.mean(outputs), ( - outputs[int((1.0 - ci) / 2 * ns)], - outputs[int((1.0 + ci) / 2 * ns)], - ) - - -if __name__ == "__main__": - print("MI between two independent continuous random variables X and Y:") - np.random.seed(0) - x = np.random.randn(1000, 10) - y = np.random.randn(1000, 3) - print(mi(x, y, base=2, alpha=0)) diff --git a/mfs/tests/MFS_test.py b/mfs/tests/MFS_test.py index ea50061..44d7c56 100755 --- a/mfs/tests/MFS_test.py +++ b/mfs/tests/MFS_test.py @@ -47,20 +47,19 @@ class MFS_test(unittest.TestCase): def test_csf_wine_cont(self): mfs = MFS(discrete=False) - expected = [6, 11, 9, 0, 12, 5] - # self.assertListAlmostEqual( - # expected, mfs.cfs(self.X_wc, self.y_w).get_results() - # ) + expected = [10, 6, 0, 2, 9, 7] + self.assertListEqual( + expected, mfs.cfs(self.X_wc, self.y_w).get_results() + ) expected = [ - 0.5218299405215557, - 0.602513857132804, - 0.4877384978817362, - 0.3743688234383051, - 0.28795671854246285, - 0.2309165735173175, + 0.735264150416997, + 0.8279580852902876, + 0.7828768186880067, + 0.7279815238718462, + 0.6287944059925545, + 0.5416637958201808, ] - # self.assertListAlmostEqual(expected, mfs.get_scores()) - print(expected, mfs.get_scores()) + self.assertListAlmostEqual(expected, mfs.get_scores()) def test_csf_max_features(self): mfs = MFS(max_features=3) diff --git a/mfs/tests/Metrics_test.py b/mfs/tests/Metrics_test.py index e1bd543..94114f6 100755 --- a/mfs/tests/Metrics_test.py +++ b/mfs/tests/Metrics_test.py @@ -1,7 +1,6 @@ import unittest import numpy as np from sklearn.datasets import load_iris, load_wine -from ..entropy_estimators import entropy from mdlp import MDLP from ..Selection import Metrics @@ -71,29 +70,6 @@ class Metrics_test(unittest.TestCase): ) self.assertAlmostEqual(computed, res_expected) - def test_dif_ent(self): - expected = [ - 1.6378708764142766, - 2.0291571802275037, - 0.8273865123744271, - 3.203935772642847, - 4.859193341386733, - 1.3707315434976266, - 1.8794952925706312, - -0.2983180654207054, - 1.4521478934625076, - 2.834404839362728, - 0.4894081282811191, - 1.361210381692561, - 7.6373991502818175, - ] - n_samples, n_features = self.X_w_c.shape - for c, res_expected in enumerate(expected): - computed = entropy( - self.X_w_c[:, c].reshape(-1, 1), k=n_samples - 2 - ) - print("-*-", computed) - def test_conditional_entropy(self): metric = Metrics() results_expected = [ @@ -142,6 +118,34 @@ class Metrics_test(unittest.TestCase): computed = metric.information_gain(self.X_i[:, col], self.y_i, 2) self.assertAlmostEqual(expected, computed) + def test_information_gain_continuous(self): + metric = Metrics() + # Wine + results_expected = [ + 0.4993916064992192, + 0.4049969724847222, + 0.2934244372102506, + 0.16970372100970632, + ] + for expected, col in zip(results_expected, range(self.X_w_c.shape[1])): + computed = metric.information_gain_cont( + self.X_w_c[:, col], self.y_w + ) + self.assertAlmostEqual(expected, computed) + # Iris + results_expected = [ + 0.32752672968734586, + 0.0, + 0.5281084030413838, + 0.0, + ] + for expected, col in zip(results_expected, range(self.X_i_c.shape[1])): + computed = metric.information_gain_cont( + self.X_i_c[:, col].reshape(-1, 1), # reshape for coverage + self.y_i, + ) + self.assertAlmostEqual(expected, computed) + def test_symmetrical_uncertainty(self): metric = Metrics() results_expected = [ @@ -168,5 +172,4 @@ class Metrics_test(unittest.TestCase): computed = metric.symmetrical_unc_continuous( self.X_w_c[:, col], self.y_w ) - # print(computed) self.assertAlmostEqual(expected, computed)