mirror of
https://github.com/Doctorado-ML/STree.git
synced 2025-08-18 17:06:01 +00:00
Refactor split_data adding sample_weight
This commit is contained in:
115
stree/Strees.py
115
stree/Strees.py
@@ -13,7 +13,8 @@ import os
|
||||
import numpy as np
|
||||
from sklearn.base import BaseEstimator, ClassifierMixin
|
||||
from sklearn.svm import LinearSVC
|
||||
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
|
||||
from sklearn.utils.multiclass import check_classification_targets
|
||||
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted, _check_sample_weight, check_random_state
|
||||
|
||||
|
||||
class Snode:
|
||||
@@ -102,9 +103,8 @@ class Siterator:
|
||||
class Stree(BaseEstimator, ClassifierMixin):
|
||||
"""
|
||||
"""
|
||||
__folder = 'data/'
|
||||
|
||||
def __init__(self, C: float = 1.0, max_iter: int = 1000, random_state: int = 0,
|
||||
def __init__(self, C: float = 1.0, max_iter: int = 1000, random_state: int = None,
|
||||
max_depth: int=None, tol: float=1e-4, use_predictions: bool = False):
|
||||
self.max_iter = max_iter
|
||||
self.C = C
|
||||
@@ -145,25 +145,25 @@ class Stree(BaseEstimator, ClassifierMixin):
|
||||
return origin[up[:, 0]] if any(up) else None, \
|
||||
origin[down[:, 0]] if any(down) else None
|
||||
|
||||
def _split_data(self, node: Snode, data: np.ndarray, indices: np.ndarray) -> list:
|
||||
def _distances(self, node: Snode, data: np.ndarray) -> np.array:
|
||||
if self.use_predictions:
|
||||
yp = node._clf.predict(data)
|
||||
down = (yp == 1).reshape(-1, 1)
|
||||
res = np.expand_dims(node._clf.decision_function(data), 1)
|
||||
else:
|
||||
# doesn't work with multiclass as each sample has to do inner product with its own coeficients
|
||||
# computes positition of every sample is w.r.t. the hyperplane
|
||||
res = self._linear_function(data, node)
|
||||
down = res > 0
|
||||
data_up, data_down = self._split_array(data, down)
|
||||
indices_up, indices_down = self._split_array(indices, down)
|
||||
res_up, res_down = self._split_array(res, down)
|
||||
return [data_up, indices_up, data_down, indices_down, res_up, res_down]
|
||||
# data_up, data_down = self._split_array(data, down)
|
||||
# indices_up, indices_down = self._split_array(indices, down)
|
||||
# res_up, res_down = self._split_array(res, down)
|
||||
# weight_up, weight_down = self._split_array(weights, down)
|
||||
#return [data_up, indices_up, data_down, indices_down, weight_up, weight_down, res_up, res_down]
|
||||
return res
|
||||
|
||||
def fit(self, X: np.ndarray, y: np.ndarray, weighted_samples: np.array=None, **fitparams: dict) -> 'Stree':
|
||||
from sklearn.utils.multiclass import check_classification_targets
|
||||
if fitparams is not None:
|
||||
self.set_params(**fitparams)
|
||||
def _split_criteria(self, data: np.array) -> np.array:
|
||||
return data > 0
|
||||
|
||||
def fit(self, X: np.ndarray, y: np.ndarray, sample_weight: np.array = None) -> 'Stree':
|
||||
# Check parameters are Ok.
|
||||
if type(y).__name__ == 'np.ndarray':
|
||||
y = y.ravel()
|
||||
if self.C < 0:
|
||||
@@ -173,12 +173,15 @@ class Stree(BaseEstimator, ClassifierMixin):
|
||||
raise ValueError(f"Maximum depth has to be greater than 1... got (max_depth={self.max_depth})")
|
||||
check_classification_targets(y)
|
||||
X, y = check_X_y(X, y)
|
||||
sample_weight = _check_sample_weight(sample_weight, X)
|
||||
check_classification_targets(y)
|
||||
# Initialize computed parameters
|
||||
#self.random_state = check_random_state(self.random_state)
|
||||
self.classes_ = np.unique(y)
|
||||
self.n_iter_ = self.max_iter
|
||||
self.depth_ = 0
|
||||
check_classification_targets(y)
|
||||
self.n_features_in_ = X.shape[1]
|
||||
self.tree_ = self.train(X, y, 1, 'root')
|
||||
self.tree_ = self.train(X, y, sample_weight, 1, 'root')
|
||||
self._build_predictor()
|
||||
return self
|
||||
|
||||
@@ -195,7 +198,7 @@ class Stree(BaseEstimator, ClassifierMixin):
|
||||
|
||||
run_tree(self.tree_)
|
||||
|
||||
def train(self, X: np.ndarray, y: np.ndarray, depth: int, title: str = 'root') -> Snode:
|
||||
def train(self, X: np.ndarray, y: np.ndarray, sample_weight: np.ndarray, depth: int, title: str) -> Snode:
|
||||
|
||||
if depth > self.__max_depth:
|
||||
return None
|
||||
@@ -203,21 +206,24 @@ class Stree(BaseEstimator, ClassifierMixin):
|
||||
# only 1 class => pure dataset
|
||||
return Snode(None, X, y, title + ', <pure>')
|
||||
# Train the model
|
||||
clf = LinearSVC(max_iter=self.max_iter, C=self.C,
|
||||
random_state=self.random_state)
|
||||
clf.fit(X, y)
|
||||
clf = LinearSVC(max_iter=self.max_iter, random_state=self.random_state,
|
||||
C=self.C) #, sample_weight=sample_weight)
|
||||
clf.fit(X, y, sample_weight=sample_weight)
|
||||
tree = Snode(clf, X, y, title)
|
||||
self.depth_ = max(depth, self.depth_)
|
||||
X_U, y_u, X_D, y_d, _, _ = self._split_data(tree, X, y)
|
||||
down = self._split_criteria(self._distances(tree, X))
|
||||
X_U, X_D = self._split_array(X, down)
|
||||
y_u, y_d = self._split_array(y, down)
|
||||
sw_u, sw_d = self._split_array(sample_weight, down)
|
||||
if X_U is None or X_D is None:
|
||||
# didn't part anything
|
||||
return Snode(clf, X, y, title + ', <cgaf>')
|
||||
tree.set_up(self.train(X_U, y_u, depth + 1, title + ' - Up'))
|
||||
tree.set_down(self.train(X_D, y_d, depth + 1, title + ' - Down'))
|
||||
tree.set_up(self.train(X_U, y_u, sw_u, depth + 1, title + ' - Up'))
|
||||
tree.set_down(self.train(X_D, y_d, sw_d, depth + 1, title + ' - Down'))
|
||||
return tree
|
||||
|
||||
def _reorder_results(self, y: np.array, indices: np.array, proba=False) -> np.array:
|
||||
if proba:
|
||||
def _reorder_results(self, y: np.array, indices: np.array) -> np.array:
|
||||
if y.ndim > 1 and y.shape[1] > 1:
|
||||
# if predict_proba return np.array of floats
|
||||
y_ordered = np.zeros(y.shape, dtype=float)
|
||||
else:
|
||||
@@ -236,10 +242,12 @@ class Stree(BaseEstimator, ClassifierMixin):
|
||||
# set a class for every sample in dataset
|
||||
prediction = np.full((xp.shape[0], 1), node._class)
|
||||
return prediction, indices
|
||||
u, i_u, d, i_d, _, _ = self._split_data(node, xp, indices)
|
||||
k, l = predict_class(d, i_d, node.get_down())
|
||||
m, n = predict_class(u, i_u, node.get_up())
|
||||
return np.append(k, m), np.append(l, n)
|
||||
down = self._split_criteria(self._distances(node, xp))
|
||||
X_U, X_D = self._split_array(xp, down)
|
||||
i_u, i_d = self._split_array(indices, down)
|
||||
prx_u, prin_u = predict_class(X_U, i_u, node.get_up())
|
||||
prx_d, prin_d = predict_class(X_D, i_d, node.get_down())
|
||||
return np.append(prx_u, prx_d), np.append(prin_u, prin_d)
|
||||
|
||||
# sklearn check
|
||||
check_is_fitted(self, ['tree_'])
|
||||
@@ -276,10 +284,15 @@ class Stree(BaseEstimator, ClassifierMixin):
|
||||
prediction = np.full((xp.shape[0], 1), node._class)
|
||||
prediction_proba = dist
|
||||
return np.append(prediction, prediction_proba, axis=1), indices
|
||||
u, i_u, d, i_d, r_u, r_d = self._split_data(node, xp, indices)
|
||||
k, l = predict_class(d, i_d, r_d, node.get_down())
|
||||
m, n = predict_class(u, i_u, r_u, node.get_up())
|
||||
return np.append(k, m), np.append(l, n)
|
||||
distances = self._distances(node, xp)
|
||||
down = self._split_criteria(distances)
|
||||
|
||||
X_U, X_D = self._split_array(xp, down)
|
||||
i_u, i_d = self._split_array(indices, down)
|
||||
di_u, di_d = self._split_array(distances, down)
|
||||
prx_u, prin_u = predict_class(X_U, i_u, di_u, node.get_up())
|
||||
prx_d, prin_d = predict_class(X_D, i_d, di_d, node.get_down())
|
||||
return np.append(prx_u, prx_d), np.append(prin_u, prin_d)
|
||||
|
||||
# sklearn check
|
||||
check_is_fitted(self, ['tree_'])
|
||||
@@ -295,7 +308,7 @@ class Stree(BaseEstimator, ClassifierMixin):
|
||||
# Probability of being 1
|
||||
result[:, 1] = 1 / (1 + np.exp(-result[:, 1]))
|
||||
result[:, 0] = 1 - result[:, 1] # Probability of being 0
|
||||
return self._reorder_results(result, indices, proba=True)
|
||||
return self._reorder_results(result, indices)
|
||||
|
||||
def score(self, X: np.array, y: np.array) -> float:
|
||||
"""Return accuracy
|
||||
@@ -319,35 +332,3 @@ class Stree(BaseEstimator, ClassifierMixin):
|
||||
output += str(i) + '\n'
|
||||
return output
|
||||
|
||||
def get_folder(self) -> str:
|
||||
return self.__folder
|
||||
|
||||
def _save_datasets(self, tree: Snode, catalog: typing.TextIO, number: int):
|
||||
"""Save the dataset of the node in a csv file
|
||||
|
||||
:param tree: node with data to save
|
||||
:type tree: Snode
|
||||
:param catalog: catalog file handler
|
||||
:type catalog: typing.TextIO
|
||||
:param number: sequential number for the generated file name
|
||||
:type number: int
|
||||
"""
|
||||
data = np.append(tree._X, tree._y.reshape(-1, 1), axis=1)
|
||||
name = f"{self.__folder}dataset{number}.csv"
|
||||
np.savetxt(name, data, delimiter=",")
|
||||
catalog.write(f"{name}, - {str(tree)}")
|
||||
if tree.is_leaf():
|
||||
return
|
||||
self._save_datasets(tree.get_down(), catalog, number + 1)
|
||||
self._save_datasets(tree.get_up(), catalog, number + 2)
|
||||
|
||||
def get_catalog_name(self):
|
||||
return self.__folder + "catalog.txt"
|
||||
|
||||
def save_sub_datasets(self):
|
||||
"""Save the every dataset stored in the tree to check with manual classifier
|
||||
"""
|
||||
if not os.path.isdir(self.__folder):
|
||||
os.mkdir(self.__folder)
|
||||
with open(self.get_catalog_name(), 'w', encoding='utf-8') as catalog:
|
||||
self._save_datasets(self.tree_, catalog, 1)
|
||||
|
@@ -107,24 +107,6 @@ class Stree_test(unittest.TestCase):
|
||||
res.append(y_original[row])
|
||||
return res
|
||||
|
||||
def test_subdatasets(self):
|
||||
"""Check if the subdatasets files have the same labels as the original dataset
|
||||
"""
|
||||
self._clf.save_sub_datasets()
|
||||
with open(self._clf.get_catalog_name()) as cat_file:
|
||||
catalog = csv.reader(cat_file, delimiter=',')
|
||||
for row in catalog:
|
||||
X, y = self._get_Xy()
|
||||
x_file, y_file = self._get_file_data(row[0])
|
||||
y_original = np.array(self._find_out(x_file, X, y), dtype=int)
|
||||
self.assertTrue(np.array_equal(y_file, y_original))
|
||||
if os.path.isdir(self._clf.get_folder()):
|
||||
try:
|
||||
os.remove(f"{self._clf.get_folder()}*")
|
||||
os.rmdir(self._clf.get_folder())
|
||||
except:
|
||||
pass
|
||||
|
||||
def test_single_prediction(self):
|
||||
X, y = self._get_Xy()
|
||||
yp = self._clf.predict((X[0, :].reshape(-1, X.shape[1])))
|
||||
@@ -141,10 +123,9 @@ class Stree_test(unittest.TestCase):
|
||||
X, y = self._get_Xy()
|
||||
accuracy_score = self._clf.score(X, y)
|
||||
yp = self._clf.predict(X)
|
||||
right = (yp == y).astype(int)
|
||||
accuracy_computed = sum(right) / len(y)
|
||||
accuracy_computed = np.mean(yp == y)
|
||||
self.assertEqual(accuracy_score, accuracy_computed)
|
||||
self.assertGreater(accuracy_score, 0.8)
|
||||
self.assertGreater(accuracy_score, 0.9)
|
||||
|
||||
def test_single_predict_proba(self):
|
||||
"""Check that element 28 has a prediction different that the current label
|
||||
|
Reference in New Issue
Block a user