Make Stree a sklearn estimator

Added check_estimator in notebook test2
Added a Stree test with check_estimator
This commit is contained in:
2020-05-25 19:51:39 +02:00
parent 5956cd0cd2
commit e95bd9697a
5 changed files with 109 additions and 48 deletions

View File

@@ -15,6 +15,7 @@ from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.svm import LinearSVC
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
class Snode:
def __init__(self, clf: LinearSVC, X: np.ndarray, y: np.ndarray, title: str):
self._clf = clf
@@ -22,7 +23,7 @@ class Snode:
self._interceptor = 0. if clf is None else clf.intercept_
self._title = title
self._belief = 0. # belief of the prediction in a leaf node based on samples
# Only store dataset in Testing
# Only store dataset in Testing
self._X = X if os.environ.get('TESTING', 'NS') != 'NS' else None
self._y = y
self._down = None
@@ -97,24 +98,27 @@ class Siterator:
self._push(node.get_down())
return node
class Stree(BaseEstimator, ClassifierMixin):
"""
"""
__folder = 'data/'
def __init__(self, C: float = 1.0, max_iter: int = 1000, random_state: int = 0, use_predictions: bool = False):
self._max_iter = max_iter
self._C = C
self._random_state = random_state
self._tree = None
self.__folder = 'data/'
self.__use_predictions = use_predictions
self.__trained = False
self.__proba = False
self.max_iter = max_iter
self.C = C
self.random_state = random_state
self.use_predictions = use_predictions
def get_params(self, deep=True):
"""Get dict with hyperparameters and its values to accomplish sklearn rules
"""
return {"C": self._C, "random_state": self._random_state, 'max_iter': self._max_iter}
return {
'C': self.C,
'random_state': self.random_state,
'max_iter': self.max_iter,
'use_predictions': self.use_predictions
}
def set_params(self, **parameters):
"""Set hyperparmeters as specified by sklearn, needed in Gridsearchs
@@ -123,12 +127,16 @@ class Stree(BaseEstimator, ClassifierMixin):
setattr(self, parameter, value)
return self
# Added binary_only tag as required by sklearn check_estimator
def _more_tags(self):
return {'binary_only': True}
def _linear_function(self, data: np.array, node: Snode) -> np.array:
coef = node._vector[0, :].reshape(-1, data.shape[1])
return data.dot(coef.T) + node._interceptor[0]
def _split_data(self, node: Snode, data: np.ndarray, indices: np.ndarray) -> list:
if self.__use_predictions:
if self.use_predictions:
yp = node._clf.predict(data)
down = (yp == 1).reshape(-1, 1)
res = np.expand_dims(node._clf.decision_function(data), 1)
@@ -147,11 +155,16 @@ class Stree(BaseEstimator, ClassifierMixin):
return [data_up, indices_up, data_down, indices_down, res_up, res_down]
def fit(self, X: np.ndarray, y: np.ndarray, title: str = 'root') -> 'Stree':
X, y = check_X_y(X, y.ravel())
from sklearn.utils.multiclass import check_classification_targets
if type(y).__name__ == 'np.ndarray':
y = y.ravel()
X, y = check_X_y(X, y)
self.classes_ = np.unique(y)
self.n_iter_ = self.max_iter
check_classification_targets(y)
self.n_features_in_ = X.shape[1]
self._tree = self.train(X, y.ravel(), title)
self.tree_ = self.train(X, y.ravel(), title)
self._build_predictor()
self.__trained = True
return self
def _build_predictor(self):
@@ -165,15 +178,15 @@ class Stree(BaseEstimator, ClassifierMixin):
run_tree(node.get_down())
run_tree(node.get_up())
run_tree(self._tree)
run_tree(self.tree_)
def train(self, X: np.ndarray, y: np.ndarray, title: str = 'root') -> Snode:
if np.unique(y).shape[0] == 1:
# only 1 class => pure dataset
return Snode(None, X, y, title + ', <pure>')
# Train the model
clf = LinearSVC(max_iter=self._max_iter, C=self._C,
random_state=self._random_state)
clf = LinearSVC(max_iter=self.max_iter, C=self.C,
random_state=self.random_state)
clf.fit(X, y)
tree = Snode(clf, X, y, title)
X_U, y_u, X_D, y_d, _, _ = self._split_data(tree, X, y)
@@ -184,8 +197,13 @@ class Stree(BaseEstimator, ClassifierMixin):
tree.set_down(self.train(X_D, y_d, title + ' - Down'))
return tree
def _reorder_results(self, y: np.array, indices: np.array) -> np.array:
y_ordered = np.zeros(y.shape, dtype=int if y.ndim == 1 else float)
def _reorder_results(self, y: np.array, indices: np.array, proba=False) -> np.array:
if proba:
# if predict_proba return np.array of floats
y_ordered = np.zeros(y.shape, dtype=float)
else:
# return array of same type given in y
y_ordered = y.copy()
indices = indices.astype(int)
for i, index in enumerate(indices):
y_ordered[index] = y[i]
@@ -205,17 +223,15 @@ class Stree(BaseEstimator, ClassifierMixin):
return np.append(k, m), np.append(l, n)
# sklearn check
check_is_fitted(self)
check_is_fitted(self, ['tree_'])
# Input validation
X = check_array(X)
# setup prediction & make it happen
indices = np.arange(X.shape[0])
return self._reorder_results(*predict_class(X, indices, self._tree))
return self._reorder_results(*predict_class(X, indices, self.tree_)).ravel()
def predict_proba(self, X: np.array) -> np.array:
"""Computes an approximation of the probability of samples belonging to class 1
(nothing more, nothing less)
"""Computes an approximation of the probability of samples belonging to class 0 and 1
:param X: dataset
:type X: np.array
"""
@@ -247,29 +263,31 @@ class Stree(BaseEstimator, ClassifierMixin):
return np.append(k, m), np.append(l, n)
# sklearn check
check_is_fitted(self)
check_is_fitted(self, ['tree_'])
# Input validation
X = check_array(X)
# setup prediction & make it happen
indices = np.arange(X.shape[0])
result, indices = predict_class(X, indices, [], self._tree)
empty_dist = np.empty((X.shape[0], 1), dtype=float)
result, indices = predict_class(X, indices, empty_dist, self.tree_)
result = result.reshape(X.shape[0], 2)
# Turn distances to hyperplane into probabilities based on fitting distances
# of samples to its hyperplane that classified them, to the sigmoid function
result[:, 1] = 1 / (1 + np.exp(-result[:, 1]))
return self._reorder_results(result, indices)
result[:, 1] = 1 / (1 + np.exp(-result[:, 1])) # Probability of being 1
result[:, 0] = 1 - result[:, 1] # Probability of being 0
return self._reorder_results(result, indices, proba=True)
def score(self, X: np.array, y: np.array) -> float:
"""Return accuracy
"""
if not self.__trained:
self.fit(X, y)
# sklearn check
check_is_fitted(self)
yp = self.predict(X).reshape(y.shape)
right = (yp == y).astype(int)
return np.sum(right) / len(y)
def __iter__(self):
return Siterator(self._tree)
return Siterator(self.tree_)
def __str__(self) -> str:
output = ''
@@ -305,7 +323,5 @@ class Stree(BaseEstimator, ClassifierMixin):
if not os.path.isdir(self.__folder):
os.mkdir(self.__folder)
with open(self.get_catalog_name(), 'w', encoding='utf-8') as catalog:
self._save_datasets(self._tree, catalog, 1)
self._save_datasets(self.tree_, catalog, 1)