diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..e0f489f --- /dev/null +++ b/.coveragerc @@ -0,0 +1,14 @@ +[run] +branch = True +source = stree + +[report] +exclude_lines = + if self.debug: + pragma: no cover + raise NotImplementedError + if __name__ == .__main__.: +ignore_errors = True +omit = + stree/tests/* + stree/__init__.py \ No newline at end of file diff --git a/.gitignore b/.gitignore index ae603c4..d50268a 100644 --- a/.gitignore +++ b/.gitignore @@ -129,4 +129,5 @@ dmypy.json .pyre/ .idea -.vscode \ No newline at end of file +.vscode +.pre-commit-config.yaml \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index 055168f..1725523 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,7 @@ os: linux dist: xenial install: - pip install -r requirements.txt + - pip install --upgrade codecov coverage black flake8 notifications: email: recipients: @@ -10,4 +11,10 @@ notifications: on_success: never # default: change on_failure: always # default: always # command to run tests -script: python -m unittest stree.tests \ No newline at end of file +script: + - black --check --diff stree + - flake8 --count --exclude __init__.py stree + - coverage run -m unittest -v stree.tests +after_success: + - codecov + - bash <(curl -Ls https://coverage.codacy.com/get.sh) \ No newline at end of file diff --git a/README.md b/README.md index 540658c..4b3b177 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ [![Build Status](https://travis-ci.com/Doctorado-ML/STree.svg?branch=master)](https://travis-ci.com/Doctorado-ML/STree) - +[![codecov](https://codecov.io/gh/doctorado-ml/stree/branch/master/graph/badge.svg)](https://codecov.io/gh/doctorado-ml/stree) +[![Codacy Badge](https://app.codacy.com/project/badge/Grade/35fa3dfd53a24a339344b33d9f9f2f3d)](https://www.codacy.com/gh/Doctorado-ML/STree?utm_source=github.com&utm_medium=referral&utm_content=Doctorado-ML/STree&utm_campaign=Badge_Grade) # Stree Oblique Tree classifier based on SVM nodes. The nodes are built and splitted with sklearn LinearSVC models.Stree is a sklearn estimator and can be integrated in pipelines, grid searches, etc. diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 0000000..00da8b6 --- /dev/null +++ b/codecov.yml @@ -0,0 +1,15 @@ +overage: + status: + project: + default: + target: auto + patch: + default: + target: auto +comment: + layout: "reach, diff, flags, files" + behavior: default + require_changes: false + require_base: yes + require_head: yes + branches: null \ No newline at end of file diff --git a/main.py b/main.py index deda86f..b74b0c8 100644 --- a/main.py +++ b/main.py @@ -2,17 +2,29 @@ import time from sklearn.model_selection import train_test_split from stree import Stree -random_state=1 +random_state = 1 + def load_creditcard(n_examples=0): import pandas as pd import numpy as np import random - df = pd.read_csv('data/creditcard.csv') - print("Fraud: {0:.3f}% {1}".format(df.Class[df.Class == 1].count()*100/df.shape[0], df.Class[df.Class == 1].count())) - print("Valid: {0:.3f}% {1}".format(df.Class[df.Class == 0].count()*100/df.shape[0], df.Class[df.Class == 0].count())) + + df = pd.read_csv("data/creditcard.csv") + print( + "Fraud: {0:.3f}% {1}".format( + df.Class[df.Class == 1].count() * 100 / df.shape[0], + df.Class[df.Class == 1].count(), + ) + ) + print( + "Valid: {0:.3f}% {1}".format( + df.Class[df.Class == 0].count() * 100 / df.shape[0], + df.Class[df.Class == 0].count(), + ) + ) y = np.expand_dims(df.Class.values, axis=1) - X = df.drop(['Class', 'Time', 'Amount'], axis=1).values + X = df.drop(["Class", "Time", "Amount"], axis=1).values if n_examples > 0: # Take first n_examples samples X = X[:n_examples, :] @@ -26,14 +38,30 @@ def load_creditcard(n_examples=0): X = np.append(Xt, X[indices], axis=0) y = np.append(yt, y[indices], axis=0) print("X.shape", X.shape, " y.shape", y.shape) - print("Fraud: {0:.3f}% {1}".format(len(y[y == 1])*100/X.shape[0], len(y[y == 1]))) - print("Valid: {0:.3f}% {1}".format(len(y[y == 0]) * 100 / X.shape[0], len(y[y == 0]))) - Xtrain, Xtest, ytrain, ytest = train_test_split(X, y, train_size=0.7, shuffle=True, random_state=random_state, stratify=y) + print( + "Fraud: {0:.3f}% {1}".format( + len(y[y == 1]) * 100 / X.shape[0], len(y[y == 1]) + ) + ) + print( + "Valid: {0:.3f}% {1}".format( + len(y[y == 0]) * 100 / X.shape[0], len(y[y == 0]) + ) + ) + Xtrain, Xtest, ytrain, ytest = train_test_split( + X, + y, + train_size=0.7, + shuffle=True, + random_state=random_state, + stratify=y, + ) return Xtrain, Xtest, ytrain, ytest + # data = load_creditcard(-5000) # Take all true samples + 5000 of the others # data = load_creditcard(5000) # Take the first 5000 samples -data = load_creditcard() # Take all the samples +data = load_creditcard() # Take all the samples Xtrain = data[0] Xtest = data[1] @@ -41,17 +69,20 @@ ytrain = data[2] ytest = data[3] now = time.time() -clf = Stree(C=.01, random_state=random_state) +clf = Stree(C=0.01, random_state=random_state) clf.fit(Xtrain, ytrain) print(f"Took {time.time() - now:.2f} seconds to train") print(clf) print(f"Classifier's accuracy (train): {clf.score(Xtrain, ytrain):.4f}") print(f"Classifier's accuracy (test) : {clf.score(Xtest, ytest):.4f}") proba = clf.predict_proba(Xtest) -print("Checking that we have correct probabilities, these are probabilities of sample belonging to class 1") +print( + "Checking that we have correct probabilities, these are probabilities of " + "sample belonging to class 1" +) res0 = proba[proba[:, 0] == 0] res1 = proba[proba[:, 0] == 1] print("++++++++++res0 > .8++++++++++++") -print(res0[res0[:, 1] > .8]) +print(res0[res0[:, 1] > 0.8]) print("**********res1 < .4************") -print(res1[res1[:, 1] < .4]) \ No newline at end of file +print(res1[res1[:, 1] < 0.4]) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9bd6669 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,16 @@ +[tool.black] +line-length = 79 +include = '\.pyi?$' +exclude = ''' +/( + \.git + | \.hg + | \.mypy_cache + | \.tox + | \.venv + | _build + | buck-out + | build + | dist +)/ +''' \ No newline at end of file diff --git a/setup.py b/setup.py index 14acc0c..290331a 100644 --- a/setup.py +++ b/setup.py @@ -5,37 +5,32 @@ __author__ = "Ricardo Montañana Gómez" def readme(): - with open('README.md') as f: + with open("README.md") as f: return f.read() setuptools.setup( - name='STree', + name="STree", version=__version__, - license='MIT License', - description='Oblique decision tree with svm nodes', + license="MIT License", + description="Oblique decision tree with svm nodes", long_description=readme(), - long_description_content_type='text/markdown', + long_description_content_type="text/markdown", packages=setuptools.find_packages(), - url='https://github.com/doctorado-ml/stree', + url="https://github.com/doctorado-ml/stree", author=__author__, - author_email='ricardo.montanana@alu.uclm.es', - keywords='scikit-learn oblique-classifier oblique-decision-tree decision-\ - tree svm svc', + author_email="ricardo.montanana@alu.uclm.es", + keywords="scikit-learn oblique-classifier oblique-decision-tree decision-\ + tree svm svc", classifiers=[ - 'Development Status :: 4 - Beta', - 'License :: OSI Approved :: MIT License', - 'Programming Language :: Python :: 3.7', - 'Natural Language :: English', - 'Topic :: Scientific/Engineering :: Artificial Intelligence', - 'Intended Audience :: Science/Research' - ], - install_requires=[ - 'scikit-learn>=0.23.0', - 'numpy', - 'matplotlib', - 'ipympl' + "Development Status :: 4 - Beta", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3.7", + "Natural Language :: English", + "Topic :: Scientific/Engineering :: Artificial Intelligence", + "Intended Audience :: Science/Research", ], + install_requires=["scikit-learn>=0.23.0", "numpy", "matplotlib", "ipympl"], test_suite="stree.tests", - zip_safe=False + zip_safe=False, ) diff --git a/stree/Strees.py b/stree/Strees.py index a910d3c..ecdc1df 100644 --- a/stree/Strees.py +++ b/stree/Strees.py @@ -1,11 +1,11 @@ -''' +""" __author__ = "Ricardo Montañana Gómez" __copyright__ = "Copyright 2020, Ricardo Montañana Gómez" __license__ = "MIT" __version__ = "0.9" Build an oblique tree classifier based on SVM Trees Uses LinearSVC -''' +""" import os @@ -13,8 +13,12 @@ import numpy as np from sklearn.base import BaseEstimator, ClassifierMixin from sklearn.svm import LinearSVC from sklearn.utils.multiclass import check_classification_targets -from sklearn.utils.validation import check_X_y, check_array, check_is_fitted, \ - _check_sample_weight +from sklearn.utils.validation import ( + check_X_y, + check_array, + check_is_fitted, + _check_sample_weight, +) class Snode: @@ -22,22 +26,23 @@ class Snode: dataset assigned to it """ - def __init__(self, clf: LinearSVC, X: np.ndarray, y: np.ndarray, - title: str): + def __init__( + self, clf: LinearSVC, X: np.ndarray, y: np.ndarray, title: str + ): self._clf = clf self._vector = None if clf is None else clf.coef_ - self._interceptor = 0. if clf is None else clf.intercept_ + self._interceptor = 0.0 if clf is None else clf.intercept_ self._title = title - self._belief = 0. + self._belief = 0.0 # Only store dataset in Testing - self._X = X if os.environ.get('TESTING', 'NS') != 'NS' else None + self._X = X if os.environ.get("TESTING", "NS") != "NS" else None self._y = y self._down = None self._up = None self._class = None @classmethod - def copy(cls, node: 'Snode') -> 'Snode': + def copy(cls, node: "Snode") -> "Snode": return cls(node._clf, node._X, node._y, node._title) def set_down(self, son): @@ -49,10 +54,10 @@ class Snode: def is_leaf(self) -> bool: return self._up is None and self._down is None - def get_down(self) -> 'Snode': + def get_down(self) -> "Snode": return self._down - def get_up(self) -> 'Snode': + def get_up(self) -> "Snode": return self._up def make_predictor(self): @@ -68,7 +73,7 @@ class Snode: try: self._belief = max_card / (max_card + min_card) except ZeroDivisionError: - self._belief = 0. + self._belief = 0.0 self._class = classes[card == max_card][0] else: self._belief = 1 @@ -77,8 +82,10 @@ class Snode: def __str__(self) -> str: if self.is_leaf(): count_values = np.unique(self._y, return_counts=True) - result = f"{self._title} - Leaf class={self._class} belief="\ + result = ( + f"{self._title} - Leaf class={self._class} belief=" f"{self._belief: .6f} counts={count_values}" + ) return result else: return f"{self._title}" @@ -116,9 +123,15 @@ class Stree(BaseEstimator, ClassifierMixin): with "classifier" as value """ - def __init__(self, C: float = 1.0, max_iter: int = 1000, - random_state: int = None, max_depth: int = None, - tol: float = 1e-4, use_predictions: bool = False): + def __init__( + self, + C: float = 1.0, + max_iter: int = 1000, + random_state: int = None, + max_depth: int = None, + tol: float = 1e-4, + use_predictions: bool = False, + ): self.max_iter = max_iter self.C = C self.random_state = random_state @@ -132,7 +145,7 @@ class Stree(BaseEstimator, ClassifierMixin): :return: the tag required :rtype: dict """ - return {'binary_only': True, 'requires_y': True} + return {"binary_only": True, "requires_y": True} def _linear_function(self, data: np.array, node: Snode) -> np.array: """Compute the distance of set of samples to a hyperplane, in @@ -140,9 +153,9 @@ class Stree(BaseEstimator, ClassifierMixin): hyperplane of each class :param data: dataset of samples - :type data: np.array + :type data: np.array shape(m, n) :param node: the node that contains the hyperplance coefficients - :type node: Snode + :type node: Snode shape(1, n) :return: array of distances of each sample to the hyperplane :rtype: np.array """ @@ -160,8 +173,10 @@ class Stree(BaseEstimator, ClassifierMixin): :rtype: list """ up = ~down - return origin[up[:, 0]] if any(up) else None, \ - origin[down[:, 0]] if any(down) else None + return ( + origin[up[:, 0]] if any(up) else None, + origin[down[:, 0]] if any(down) else None, + ) def _distances(self, node: Snode, data: np.ndarray) -> np.array: """Compute distances of the samples to the hyperplane of the node @@ -194,8 +209,9 @@ class Stree(BaseEstimator, ClassifierMixin): """ return data > 0 - def fit(self, X: np.ndarray, y: np.ndarray, - sample_weight: np.array = None) -> 'Stree': + def fit( + self, X: np.ndarray, y: np.ndarray, sample_weight: np.array = None + ) -> "Stree": """Build the tree based on the dataset of samples and its labels :raises ValueError: if parameters C or max_depth are out of bounds @@ -203,17 +219,22 @@ class Stree(BaseEstimator, ClassifierMixin): :rtype: Stree """ # Check parameters are Ok. - if type(y).__name__ == 'np.ndarray': + if type(y).__name__ == "np.ndarray": y = y.ravel() if self.C < 0: raise ValueError( - f"Penalty term must be positive... got (C={self.C:f})") - self.__max_depth = np.iinfo( - np.int32).max if self.max_depth is None else self.max_depth + f"Penalty term must be positive... got (C={self.C:f})" + ) + self.__max_depth = ( + np.iinfo(np.int32).max + if self.max_depth is None + else self.max_depth + ) if self.__max_depth < 1: raise ValueError( f"Maximum depth has to be greater than 1... got (max_depth=\ - {self.max_depth})") + {self.max_depth})" + ) check_classification_targets(y) X, y = check_X_y(X, y) sample_weight = _check_sample_weight(sample_weight, X) @@ -223,13 +244,14 @@ class Stree(BaseEstimator, ClassifierMixin): self.n_iter_ = self.max_iter self.depth_ = 0 self.n_features_in_ = X.shape[1] - self.tree_ = self.train(X, y, sample_weight, 1, 'root') + self.tree_ = self.train(X, y, sample_weight, 1, "root") self._build_predictor() return self def _build_predictor(self): """Process the leaves to make them predictors """ + def run_tree(node: Snode): if node.is_leaf(): node.make_predictor() @@ -239,8 +261,14 @@ class Stree(BaseEstimator, ClassifierMixin): run_tree(self.tree_) - def train(self, X: np.ndarray, y: np.ndarray, sample_weight: np.ndarray, - depth: int, title: str) -> Snode: + def train( + self, + X: np.ndarray, + y: np.ndarray, + sample_weight: np.ndarray, + depth: int, + title: str, + ) -> Snode: """Recursive function to split the original dataset into predictor nodes (leaves) @@ -261,10 +289,11 @@ class Stree(BaseEstimator, ClassifierMixin): return None if np.unique(y).shape[0] == 1: # only 1 class => pure dataset - return Snode(None, X, y, title + ', ') + return Snode(None, X, y, title + ", ") # Train the model - clf = LinearSVC(max_iter=self.max_iter, random_state=self.random_state, - C=self.C) # , sample_weight=sample_weight) + clf = LinearSVC( + max_iter=self.max_iter, random_state=self.random_state, C=self.C + ) # , sample_weight=sample_weight) clf.fit(X, y, sample_weight=sample_weight) tree = Snode(clf, X, y, title) self.depth_ = max(depth, self.depth_) @@ -274,9 +303,9 @@ class Stree(BaseEstimator, ClassifierMixin): sw_u, sw_d = self._split_array(sample_weight, down) if X_U is None or X_D is None: # didn't part anything - return Snode(clf, X, y, title + ', ') - tree.set_up(self.train(X_U, y_u, sw_u, depth + 1, title + ' - Up')) - tree.set_down(self.train(X_D, y_d, sw_d, depth + 1, title + ' - Down')) + return Snode(clf, X, y, title + ", ") + tree.set_up(self.train(X_U, y_u, sw_u, depth + 1, title + " - Up")) + tree.set_down(self.train(X_D, y_d, sw_d, depth + 1, title + " - Down")) return tree def _reorder_results(self, y: np.array, indices: np.array) -> np.array: @@ -308,8 +337,10 @@ class Stree(BaseEstimator, ClassifierMixin): :return: array of labels :rtype: np.array """ - def predict_class(xp: np.array, indices: np.array, - node: Snode) -> np.array: + + def predict_class( + xp: np.array, indices: np.array, node: Snode + ) -> np.array: if xp is None: return [], [] if node.is_leaf(): @@ -322,14 +353,18 @@ class Stree(BaseEstimator, ClassifierMixin): prx_u, prin_u = predict_class(X_U, i_u, node.get_up()) prx_d, prin_d = predict_class(X_D, i_d, node.get_down()) return np.append(prx_u, prx_d), np.append(prin_u, prin_d) + # sklearn check - check_is_fitted(self, ['tree_']) + check_is_fitted(self, ["tree_"]) # Input validation X = check_array(X) # setup prediction & make it happen indices = np.arange(X.shape[0]) - result = self._reorder_results( - *predict_class(X, indices, self.tree_)).astype(int).ravel() + result = ( + self._reorder_results(*predict_class(X, indices, self.tree_)) + .astype(int) + .ravel() + ) return self.classes_[result] def predict_proba(self, X: np.array) -> np.array: @@ -341,8 +376,10 @@ class Stree(BaseEstimator, ClassifierMixin): each class :rtype: np.array """ - def predict_class(xp: np.array, indices: np.array, dist: np.array, - node: Snode) -> np.array: + + def predict_class( + xp: np.array, indices: np.array, dist: np.array, node: Snode + ) -> np.array: """Run the tree to compute predictions :param xp: subdataset of samples @@ -375,7 +412,7 @@ class Stree(BaseEstimator, ClassifierMixin): return np.append(prx_u, prx_d), np.append(prin_u, prin_d) # sklearn check - check_is_fitted(self, ['tree_']) + check_is_fitted(self, ["tree_"]) # Input validation X = check_array(X) # setup prediction & make it happen @@ -426,7 +463,7 @@ class Stree(BaseEstimator, ClassifierMixin): :return: description of nodes in the tree in preorder :rtype: str """ - output = '' + output = "" for i in self: - output += str(i) + '\n' + output += str(i) + "\n" return output diff --git a/stree/Strees_grapher.py b/stree/Strees_grapher.py index c6f7a46..c9c425e 100644 --- a/stree/Strees_grapher.py +++ b/stree/Strees_grapher.py @@ -1,10 +1,10 @@ -''' +""" __author__ = "Ricardo Montañana Gómez" __copyright__ = "Copyright 2020, Ricardo Montañana Gómez" __license__ = "MIT" __version__ = "0.9" Plot 3D views of nodes in Stree -''' +""" import os @@ -17,7 +17,6 @@ from .Strees import Stree, Snode, Siterator class Snode_graph(Snode): - def __init__(self, node: Stree): self._plot_size = (8, 8) self._xlimits = (None, None) @@ -29,34 +28,36 @@ class Snode_graph(Snode): def set_plot_size(self, size: tuple): self._plot_size = size + def get_plot_size(self) -> tuple: + return self._plot_size + def _is_pure(self) -> bool: """is considered pure a leaf node with one label """ if self.is_leaf(): - return self._belief == 1. + return self._belief == 1.0 return False def set_axis_limits(self, limits: tuple): - self._xlimits = limits[0] - self._ylimits = limits[1] - self._zlimits = limits[2] + self._xlimits, self._ylimits, self._zlimits = limits def _set_graphics_axis(self, ax: Axes3D): ax.set_xlim(self._xlimits) ax.set_ylim(self._ylimits) ax.set_zlim(self._zlimits) - def save_hyperplane(self, save_folder: str = './', save_prefix: str = '', - save_seq: int = 1): + def save_hyperplane( + self, save_folder: str = "./", save_prefix: str = "", save_seq: int = 1 + ): _, fig = self.plot_hyperplane() name = f"{save_folder}{save_prefix}STnode{save_seq}.png" - fig.savefig(name, bbox_inches='tight') + fig.savefig(name, bbox_inches="tight") plt.close(fig) def _get_cmap(self): - cmap = 'jet' + cmap = "jet" if self._is_pure() and self._class == 1: - cmap = 'jet_r' + cmap = "jet_r" return cmap def _graph_title(self): @@ -65,22 +66,31 @@ class Snode_graph(Snode): def plot_hyperplane(self, plot_distribution: bool = True): fig = plt.figure(figsize=self._plot_size) - ax = fig.add_subplot(1, 1, 1, projection='3d') + ax = fig.add_subplot(1, 1, 1, projection="3d") if not self._is_pure(): # Can't plot hyperplane of leaves with one label because it hasn't # classiffier # get the splitting hyperplane - def hyperplane(x, y): return (-self._interceptor - - self._vector[0][0] * x - - self._vector[0][1] * y) \ - / self._vector[0][2] + def hyperplane(x, y): + return ( + -self._interceptor + - self._vector[0][0] * x + - self._vector[0][1] * y + ) / self._vector[0][2] tmpx = np.linspace(self._X[:, 0].min(), self._X[:, 0].max()) tmpy = np.linspace(self._X[:, 1].min(), self._X[:, 1].max()) xx, yy = np.meshgrid(tmpx, tmpy) - ax.plot_surface(xx, yy, hyperplane(xx, yy), alpha=.5, - antialiased=True, rstride=1, cstride=1, - cmap='seismic') + ax.plot_surface( + xx, + yy, + hyperplane(xx, yy), + alpha=0.5, + antialiased=True, + rstride=1, + cstride=1, + cmap="seismic", + ) self._set_graphics_axis(ax) if plot_distribution: self.plot_distribution(ax) @@ -92,14 +102,15 @@ class Snode_graph(Snode): def plot_distribution(self, ax: Axes3D = None): if ax is None: fig = plt.figure(figsize=self._plot_size) - ax = fig.add_subplot(1, 1, 1, projection='3d') + ax = fig.add_subplot(1, 1, 1, projection="3d") plt.title(self._graph_title()) cmap = self._get_cmap() - ax.scatter(self._X[:, 0], self._X[:, 1], - self._X[:, 2], c=self._y, cmap=cmap) - ax.set_xlabel('X0') - ax.set_ylabel('X1') - ax.set_zlabel('X2') + ax.scatter( + self._X[:, 0], self._X[:, 1], self._X[:, 2], c=self._y, cmap=cmap + ) + ax.set_xlabel("X0") + ax.set_ylabel("X1") + ax.set_zlabel("X2") plt.show() @@ -112,17 +123,17 @@ class Stree_grapher(Stree): self._plot_size = (8, 8) self._tree_gr = None # make Snode store X's - os.environ['TESTING'] = '1' + os.environ["TESTING"] = "1" self._fitted = False self._pca = None super().__init__(**params) def __del__(self): try: - os.environ.pop('TESTING') + os.environ.pop("TESTING") except KeyError: pass - plt.close('all') + plt.close("all") def _copy_tree(self, node: Snode) -> Snode_graph: mirror = Snode_graph(node) @@ -161,9 +172,9 @@ class Stree_grapher(Stree): def _check_fitted(self): if not self._fitted: - raise Exception('Have to fit the grapher first!') + raise Exception("Have to fit the grapher first!") - def save_all(self, save_folder: str = './', save_prefix: str = ''): + def save_all(self, save_folder: str = "./", save_prefix: str = ""): """Save all the node plots in png format, each with a sequence number :param save_folder: folder where the plots are saved, defaults to './' @@ -174,8 +185,9 @@ class Stree_grapher(Stree): os.mkdir(save_folder) seq = 1 for node in self: - node.save_hyperplane(save_folder=save_folder, - save_prefix=save_prefix, save_seq=seq) + node.save_hyperplane( + save_folder=save_folder, save_prefix=save_prefix, save_seq=seq + ) seq += 1 def plot_all(self): diff --git a/stree/__init__.py b/stree/__init__.py index e242e2e..fd32aae 100644 --- a/stree/__init__.py +++ b/stree/__init__.py @@ -1,2 +1,4 @@ from .Strees import Stree, Snode, Siterator -from .Strees_grapher import Stree_grapher, Snode_graph \ No newline at end of file +from .Strees_grapher import Stree_grapher, Snode_graph + +__all__ = ["Stree", "Snode", "Siterator", "Stree_grapher", "Snode_graph"] diff --git a/stree/tests/Strees_grapher_test.py b/stree/tests/Strees_grapher_test.py new file mode 100644 index 0000000..cd47593 --- /dev/null +++ b/stree/tests/Strees_grapher_test.py @@ -0,0 +1,211 @@ +import os +import imghdr +import unittest + +import numpy as np +import matplotlib +import matplotlib.pyplot as plt +import warnings +from sklearn.datasets import make_classification + +from stree import Stree_grapher, Snode_graph + + +def get_dataset(random_state=0, n_features=3): + X, y = make_classification( + n_samples=1500, + n_features=n_features, + n_informative=3, + n_redundant=0, + n_repeated=0, + n_classes=2, + n_clusters_per_class=2, + class_sep=1.5, + flip_y=0, + weights=[0.5, 0.5], + random_state=random_state, + ) + return X, y + + +class Stree_grapher_test(unittest.TestCase): + def __init__(self, *args, **kwargs): + os.environ["TESTING"] = "1" + self._random_state = 1 + self._clf = Stree_grapher( + dict(random_state=self._random_state, use_predictions=False) + ) + self._clf.fit(*get_dataset(self._random_state, n_features=4)) + super().__init__(*args, **kwargs) + + @classmethod + def tearDownClass(cls): + try: + os.environ.pop("TESTING") + except KeyError: + pass + + def test_iterator(self): + """Check preorder iterator + """ + expected = [ + "root", + "root - Down", + "root - Down - Down, - Leaf class=1 belief= 0.976023 counts" + "=(array([0, 1]), array([ 17, 692]))", + "root - Down - Up", + "root - Down - Up - Down, - Leaf class=0 belief= 0.500000 " + "counts=(array([0, 1]), array([1, 1]))", + "root - Down - Up - Up, - Leaf class=0 belief= 0.888889 " + "counts=(array([0, 1]), array([8, 1]))", + "root - Up, - Leaf class=0 belief= 0.928205 counts=(array(" + "[0, 1]), array([724, 56]))", + ] + computed = [] + for node in self._clf: + computed.append(str(node)) + self.assertListEqual(expected, computed) + + def test_score(self): + X, y = get_dataset(self._random_state) + accuracy_score = self._clf.score(X, y) + yp = self._clf.predict(X) + accuracy_computed = np.mean(yp == y) + self.assertEqual(accuracy_score, accuracy_computed) + self.assertGreater(accuracy_score, 0.86) + + def test_save_all(self): + folder_name = "/tmp/" + file_names = [f"{folder_name}STnode{i}.png" for i in range(1, 8)] + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + matplotlib.use("Agg") + self._clf.save_all(save_folder=folder_name) + for file_name in file_names: + self.assertTrue(os.path.exists(file_name)) + self.assertEqual("png", imghdr.what(file_name)) + os.remove(file_name) + + def test_plot_all(self): + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + matplotlib.use("Agg") + num_figures_before = plt.gcf().number + self._clf.plot_all() + num_figures_after = plt.gcf().number + self.assertEqual(7, num_figures_after - num_figures_before) + + def test_filt_4_dims_dataset(self): + self._clf.fit(*get_dataset(self._random_state, n_features=4)) + + +class Snode_graph_test(unittest.TestCase): + def __init__(self, *args, **kwargs): + os.environ["TESTING"] = "1" + self._random_state = 1 + self._clf = Stree_grapher( + dict(random_state=self._random_state, use_predictions=False) + ) + self._clf.fit(*get_dataset(self._random_state)) + super().__init__(*args, **kwargs) + + @classmethod + def tearDownClass(cls): + """Remove the testing environ variable + """ + try: + os.environ.pop("TESTING") + except KeyError: + pass + + def test_plot_size(self): + default = self._clf._tree_gr.get_plot_size() + expected = (17, 3) + self._clf._tree_gr.set_plot_size(expected) + self.assertEqual(expected, self._clf._tree_gr.get_plot_size()) + self._clf._tree_gr.set_plot_size(default) + self.assertEqual(default, self._clf._tree_gr.get_plot_size()) + + def test_attributes_in_leaves_graph(self): + """Check if the attributes in leaves have correct values so they form a + predictor + """ + + def check_leave(node: Snode_graph): + if not node.is_leaf(): + check_leave(node.get_down()) + check_leave(node.get_up()) + return + # Check Belief in leave + classes, card = np.unique(node._y, return_counts=True) + max_card = max(card) + min_card = min(card) + if len(classes) > 1: + try: + belief = max_card / (max_card + min_card) + except ZeroDivisionError: + belief = 0.0 + else: + belief = 1 + self.assertEqual(belief, node._belief) + # Check Class + class_computed = classes[card == max_card] + self.assertEqual(class_computed, node._class) + + check_leave(self._clf._tree_gr) + + def test_nodes_graph_coefs(self): + """Check if the nodes of the tree have the right attributes filled + """ + + def run_tree(node: Snode_graph): + if node._belief < 1: + # only exclude pure leaves + self.assertIsNotNone(node._clf) + self.assertIsNotNone(node._clf.coef_) + self.assertIsNotNone(node._vector) + self.assertIsNotNone(node._interceptor) + if node.is_leaf(): + return + run_tree(node.get_down()) + run_tree(node.get_up()) + + run_tree(self._clf._tree_gr) + + def test_save_hyperplane(self): + folder_name = "/tmp/" + file_name = f"{folder_name}STnode1.png" + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + matplotlib.use("Agg") + self._clf._tree_gr.save_hyperplane(folder_name) + self.assertTrue(os.path.exists(file_name)) + self.assertEqual("png", imghdr.what(file_name)) + os.remove(file_name) + + def test_plot_hyperplane_with_distribution(self): + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + matplotlib.use("Agg") + num_figures_before = plt.gcf().number + self._clf._tree_gr.plot_hyperplane(plot_distribution=True) + num_figures_after = plt.gcf().number + self.assertEqual(1, num_figures_after - num_figures_before) + + def test_plot_hyperplane_without_distribution(self): + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + matplotlib.use("Agg") + num_figures_before = plt.gcf().number + self._clf._tree_gr.plot_hyperplane(plot_distribution=False) + num_figures_after = plt.gcf().number + self.assertEqual(1, num_figures_after - num_figures_before) + + def test_plot_distribution(self): + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + matplotlib.use("Agg") + num_figures_before = plt.gcf().number + self._clf._tree_gr.plot_distribution() + num_figures_after = plt.gcf().number + self.assertEqual(1, num_figures_after - num_figures_before) diff --git a/stree/tests/Strees_test.py b/stree/tests/Strees_test.py index ef0b211..1a63380 100644 --- a/stree/tests/Strees_test.py +++ b/stree/tests/Strees_test.py @@ -7,30 +7,54 @@ from sklearn.datasets import make_classification from stree import Stree, Snode -class Stree_test(unittest.TestCase): +def get_dataset(random_state=0): + X, y = make_classification( + n_samples=1500, + n_features=3, + n_informative=3, + n_redundant=0, + n_repeated=0, + n_classes=2, + n_clusters_per_class=2, + class_sep=1.5, + flip_y=0, + weights=[0.5, 0.5], + random_state=random_state, + ) + return X, y + +class Stree_test(unittest.TestCase): def __init__(self, *args, **kwargs): - os.environ['TESTING'] = '1' + os.environ["TESTING"] = "1" self._random_state = 1 - self._clf = Stree(random_state=self._random_state, - use_predictions=False) - self._clf.fit(*self._get_Xy()) + self._clf = Stree( + random_state=self._random_state, use_predictions=False + ) + self._clf.fit(*get_dataset(self._random_state)) super().__init__(*args, **kwargs) @classmethod def tearDownClass(cls): try: - os.environ.pop('TESTING') + os.environ.pop("TESTING") except KeyError: pass def _get_Xy(self): - X, y = make_classification(n_samples=1500, n_features=3, - n_informative=3, n_redundant=0, - n_repeated=0, n_classes=2, - n_clusters_per_class=2, class_sep=1.5, - flip_y=0, weights=[0.5, 0.5], - random_state=self._random_state) + X, y = make_classification( + n_samples=1500, + n_features=3, + n_informative=3, + n_redundant=0, + n_repeated=0, + n_classes=2, + n_clusters_per_class=2, + class_sep=1.5, + flip_y=0, + weights=[0.5, 0.5], + random_state=self._random_state, + ) return X, y def _check_tree(self, node: Snode): @@ -85,15 +109,16 @@ class Stree_test(unittest.TestCase): Returns: tuple -- tuple with samples, categories """ - data = np.genfromtxt(file_name, delimiter=',') + data = np.genfromtxt(file_name, delimiter=",") data = np.array(data) column_y = data.shape[1] - 1 fy = data[:, column_y] fx = np.delete(data, column_y, axis=1) return fx, fy - def _find_out(self, px: np.array, x_original: np.array, - y_original) -> list: + def _find_out( + self, px: np.array, x_original: np.array, y_original + ) -> list: """Find the original values of y for a given array of samples Arguments: @@ -112,19 +137,19 @@ class Stree_test(unittest.TestCase): return res def test_single_prediction(self): - X, y = self._get_Xy() + X, y = get_dataset(self._random_state) yp = self._clf.predict((X[0, :].reshape(-1, X.shape[1]))) self.assertEqual(yp[0], y[0]) def test_multiple_prediction(self): # First 27 elements the predictions are the same as the truth num = 27 - X, y = self._get_Xy() + X, y = get_dataset(self._random_state) yp = self._clf.predict(X[:num, :]) self.assertListEqual(y[:num].tolist(), yp.tolist()) def test_score(self): - X, y = self._get_Xy() + X, y = get_dataset(self._random_state) accuracy_score = self._clf.score(X, y) yp = self._clf.predict(X) accuracy_computed = np.mean(yp == y) @@ -138,35 +163,55 @@ class Stree_test(unittest.TestCase): # Element 28 has a different prediction than the truth decimals = 5 prob = 0.29026400766 - X, y = self._get_Xy() + X, y = get_dataset(self._random_state) yp = self._clf.predict_proba(X[28, :].reshape(-1, X.shape[1])) - self.assertEqual(np.round(1 - prob, decimals), - np.round(yp[0:, 0], decimals)) + self.assertEqual( + np.round(1 - prob, decimals), np.round(yp[0:, 0], decimals) + ) self.assertEqual(1, y[28]) self.assertAlmostEqual( - round(prob, decimals), - round(yp[0, 1], decimals), - decimals + round(prob, decimals), round(yp[0, 1], decimals), decimals ) def test_multiple_predict_proba(self): # First 27 elements the predictions are the same as the truth num = 27 decimals = 5 - X, y = self._get_Xy() + X, y = get_dataset(self._random_state) yp = self._clf.predict_proba(X[:num, :]) self.assertListEqual( - y[:num].tolist(), np.argmax(yp[:num], axis=1).tolist()) - expected_proba = [0.88395641, 0.36746962, 0.84158767, 0.34106833, - 0.14269291, 0.85193236, - 0.29876058, 0.7282164, 0.85958616, 0.89517877, - 0.99745224, 0.18860349, - 0.30756427, 0.8318412, 0.18981198, 0.15564624, - 0.25740655, 0.22923355, - 0.87365959, 0.49928689, 0.95574351, 0.28761257, - 0.28906333, 0.32643692, - 0.29788483, 0.01657364, 0.81149083] + y[:num].tolist(), np.argmax(yp[:num], axis=1).tolist() + ) + expected_proba = [ + 0.88395641, + 0.36746962, + 0.84158767, + 0.34106833, + 0.14269291, + 0.85193236, + 0.29876058, + 0.7282164, + 0.85958616, + 0.89517877, + 0.99745224, + 0.18860349, + 0.30756427, + 0.8318412, + 0.18981198, + 0.15564624, + 0.25740655, + 0.22923355, + 0.87365959, + 0.49928689, + 0.95574351, + 0.28761257, + 0.28906333, + 0.32643692, + 0.29788483, + 0.01657364, + 0.81149083, + ] expected = np.round(expected_proba, decimals=decimals).tolist() computed = np.round(yp[:, 1], decimals=decimals).tolist() for i in range(len(expected)): @@ -178,11 +223,13 @@ class Stree_test(unittest.TestCase): use vector of coefficients to compute both predictions and splitted data """ - model_clf = Stree(random_state=self._random_state, - use_predictions=True) - model_computed = Stree(random_state=self._random_state, - use_predictions=False) - X, y = self._get_Xy() + model_clf = Stree( + random_state=self._random_state, use_predictions=True + ) + model_computed = Stree( + random_state=self._random_state, use_predictions=False + ) + X, y = get_dataset(self._random_state) model_clf.fit(X, y) model_computed.fit(X, y) return model_clf, model_computed, X, y @@ -194,74 +241,76 @@ class Stree_test(unittest.TestCase): """ use_clf, use_math, X, _ = self.build_models() self.assertListEqual( - use_clf.predict(X).tolist(), - use_math.predict(X).tolist() + use_clf.predict(X).tolist(), use_math.predict(X).tolist() ) def test_use_model_score(self): use_clf, use_math, X, y = self.build_models() b = use_math.score(X, y) - self.assertEqual( - use_clf.score(X, y), - b - ) - self.assertGreater(b, .95) + self.assertEqual(use_clf.score(X, y), b) + self.assertGreater(b, 0.95) def test_use_model_predict_proba(self): use_clf, use_math, X, _ = self.build_models() self.assertListEqual( use_clf.predict_proba(X).tolist(), - use_math.predict_proba(X).tolist() + use_math.predict_proba(X).tolist(), ) def test_single_vs_multiple_prediction(self): """Check if predicting sample by sample gives the same result as predicting all samples at once """ - X, _ = self._get_Xy() + X, _ = get_dataset(self._random_state) # Compute prediction line by line yp_line = np.array([], dtype=int) for xp in X: - yp_line = np.append(yp_line, self._clf.predict( - xp.reshape(-1, X.shape[1]))) + yp_line = np.append( + yp_line, self._clf.predict(xp.reshape(-1, X.shape[1])) + ) # Compute prediction at once yp_once = self._clf.predict(X) # self.assertListEqual(yp_line.tolist(), yp_once.tolist()) - def test_iterator(self): + def test_iterator_and_str(self): """Check preorder iterator """ expected = [ - 'root', - 'root - Down', - 'root - Down - Down, - Leaf class=1 belief= 0.975989 counts' - '=(array([0, 1]), array([ 17, 691]))', - 'root - Down - Up', - 'root - Down - Up - Down, - Leaf class=1 belief= 0.750000 ' - 'counts=(array([0, 1]), array([1, 3]))', - 'root - Down - Up - Up, - Leaf class=0 belief= 1.000000 ' - 'counts=(array([0]), array([7]))', - 'root - Up, - Leaf class=0 belief= 0.928297 counts=(array(' - '[0, 1]), array([725, 56]))', + "root", + "root - Down", + "root - Down - Down, - Leaf class=1 belief= 0.975989 counts" + "=(array([0, 1]), array([ 17, 691]))", + "root - Down - Up", + "root - Down - Up - Down, - Leaf class=1 belief= 0.750000 " + "counts=(array([0, 1]), array([1, 3]))", + "root - Down - Up - Up, - Leaf class=0 belief= 1.000000 " + "counts=(array([0]), array([7]))", + "root - Up, - Leaf class=0 belief= 0.928297 counts=(array(" + "[0, 1]), array([725, 56]))", ] computed = [] + expected_string = "" for node in self._clf: computed.append(str(node)) + expected_string += str(node) + "\n" self.assertListEqual(expected, computed) + self.assertEqual(expected_string, str(self._clf)) def test_is_a_sklearn_classifier(self): import warnings from sklearn.exceptions import ConvergenceWarning - warnings.filterwarnings('ignore', category=ConvergenceWarning) - warnings.filterwarnings('ignore', category=RuntimeWarning) + + warnings.filterwarnings("ignore", category=ConvergenceWarning) + warnings.filterwarnings("ignore", category=RuntimeWarning) from sklearn.utils.estimator_checks import check_estimator + check_estimator(Stree()) def test_exception_if_C_is_negative(self): tclf = Stree(C=-1) with self.assertRaises(ValueError): - tclf.fit(*self._get_Xy()) + tclf.fit(*get_dataset(self._random_state)) def test_check_max_depth_is_positive_or_None(self): tcl = Stree() @@ -270,12 +319,12 @@ class Stree_test(unittest.TestCase): self.assertGreaterEqual(1, tcl.max_depth) with self.assertRaises(ValueError): tcl = Stree(max_depth=-1) - tcl.fit(*self._get_Xy()) + tcl.fit(*get_dataset(self._random_state)) def test_check_max_depth(self): depth = 3 tcl = Stree(random_state=self._random_state, max_depth=depth) - tcl.fit(*self._get_Xy()) + tcl.fit(*get_dataset(self._random_state)) self.assertEqual(depth, tcl.depth_) def test_unfitted_tree_is_iterable(self): @@ -284,13 +333,13 @@ class Stree_test(unittest.TestCase): class Snode_test(unittest.TestCase): - def __init__(self, *args, **kwargs): - os.environ['TESTING'] = '1' + os.environ["TESTING"] = "1" self._random_state = 1 - self._clf = Stree(random_state=self._random_state, - use_predictions=True) - self._clf.fit(*self._get_Xy()) + self._clf = Stree( + random_state=self._random_state, use_predictions=True + ) + self._clf.fit(*get_dataset(self._random_state)) super().__init__(*args, **kwargs) @classmethod @@ -298,18 +347,10 @@ class Snode_test(unittest.TestCase): """[summary] """ try: - os.environ.pop('TESTING') + os.environ.pop("TESTING") except KeyError: pass - def _get_Xy(self): - X, y = make_classification(n_samples=1500, n_features=3, - n_informative=3, n_redundant=0, n_classes=2, - n_repeated=0, n_clusters_per_class=2, - class_sep=1.5, flip_y=0, weights=[0.5, 0.5], - random_state=self._random_state) - return X, y - def test_attributes_in_leaves(self): """Check if the attributes in leaves have correct values so they form a predictor @@ -328,7 +369,7 @@ class Snode_test(unittest.TestCase): try: belief = max_card / (max_card + min_card) except ZeroDivisionError: - belief = 0. + belief = 0.0 else: belief = 1 self.assertEqual(belief, node._belief) @@ -355,3 +396,16 @@ class Snode_test(unittest.TestCase): run_tree(node.get_up()) run_tree(self._clf.tree_) + + def test_make_predictor_on_leaf(self): + test = Snode(None, [1, 2, 3, 4], [1, 0, 1, 1], "test") + test.make_predictor() + self.assertEqual(1, test._class) + self.assertEqual(0.75, test._belief) + + def test_make_predictor_on_not_leaf(self): + test = Snode(None, [1, 2, 3, 4], [1, 0, 1, 1], "test") + test.set_up(Snode(None, [1], [1], "another_test")) + test.make_predictor() + self.assertIsNone(test._class) + self.assertEqual(0, test._belief) diff --git a/stree/tests/__init__.py b/stree/tests/__init__.py index dca330d..e573e8e 100644 --- a/stree/tests/__init__.py +++ b/stree/tests/__init__.py @@ -1 +1,9 @@ -from .Strees_test import Stree_test, Snode_test \ No newline at end of file +from .Strees_test import Stree_test, Snode_test +from .Strees_grapher_test import Stree_grapher_test, Snode_graph_test + +__all__ = [ + "Stree_test", + "Snode_test", + "Stree_grapher_test", + "Snode_graph_test", +]