#2 - Add gini and entropy measures

rename get_dataset to load_dataset
add features and impurity to  __str__ of node
This commit is contained in:
2020-06-14 03:08:55 +02:00
parent ae1c199e21
commit f1ee4de37b
5 changed files with 118 additions and 98 deletions

67
main.py
View File

@@ -1,72 +1,15 @@
import time
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
from stree import Stree
random_state = 1
X, y = load_iris(return_X_y=True)
def load_creditcard(n_examples=0):
import pandas as pd
import numpy as np
import random
df = pd.read_csv("data/creditcard.csv")
print(
"Fraud: {0:.3f}% {1}".format(
df.Class[df.Class == 1].count() * 100 / df.shape[0],
df.Class[df.Class == 1].count(),
)
)
print(
"Valid: {0:.3f}% {1}".format(
df.Class[df.Class == 0].count() * 100 / df.shape[0],
df.Class[df.Class == 0].count(),
)
)
y = np.expand_dims(df.Class.values, axis=1)
X = df.drop(["Class", "Time", "Amount"], axis=1).values
if n_examples > 0:
# Take first n_examples samples
X = X[:n_examples, :]
y = y[:n_examples, :]
else:
# Take all the positive samples with a number of random negatives
if n_examples < 0:
Xt = X[(y == 1).ravel()]
yt = y[(y == 1).ravel()]
indices = random.sample(range(X.shape[0]), -1 * n_examples)
X = np.append(Xt, X[indices], axis=0)
y = np.append(yt, y[indices], axis=0)
print("X.shape", X.shape, " y.shape", y.shape)
print(
"Fraud: {0:.3f}% {1}".format(
len(y[y == 1]) * 100 / X.shape[0], len(y[y == 1])
)
)
print(
"Valid: {0:.3f}% {1}".format(
len(y[y == 0]) * 100 / X.shape[0], len(y[y == 0])
)
)
Xtrain, Xtest, ytrain, ytest = train_test_split(
X,
y,
train_size=0.7,
shuffle=True,
random_state=random_state,
stratify=y,
)
return Xtrain, Xtest, ytrain, ytest
# data = load_creditcard(-5000) # Take all true samples + 5000 of the others
# data = load_creditcard(5000) # Take the first 5000 samples
data = load_creditcard() # Take all the samples
Xtrain = data[0]
Xtest = data[1]
ytrain = data[2]
ytest = data[3]
Xtrain, Xtest, ytrain, ytest = train_test_split(
X, y, test_size=0.2, random_state=random_state
)
now = time.time()
clf = Stree(C=0.01, random_state=random_state)

View File

@@ -29,7 +29,15 @@ class Snode:
dataset assigned to it
"""
def __init__(self, clf: SVC, X: np.ndarray, y: np.ndarray, title: str):
def __init__(
self,
clf: SVC,
X: np.ndarray,
y: np.ndarray,
features: np.array,
impurity: float,
title: str,
):
self._clf = clf
self._title = title
self._belief = 0.0
@@ -39,10 +47,21 @@ class Snode:
self._down = None
self._up = None
self._class = None
self._feature = None
self._sample_weight = None
self._features = features
self._impurity = impurity
@classmethod
def copy(cls, node: "Snode") -> "Snode":
return cls(node._clf, node._X, node._y, node._title)
return cls(
node._clf,
node._X,
node._y,
node._features,
node._impurity,
node._title,
)
def set_down(self, son):
self._down = son
@@ -83,11 +102,15 @@ class Snode:
count_values = np.unique(self._y, return_counts=True)
result = (
f"{self._title} - Leaf class={self._class} belief="
f"{self._belief: .6f} counts={count_values}"
f"{self._belief: .6f} impurity={self._impurity:.4f} "
f"counts={count_values}"
)
return result
else:
return f"{self._title}"
return (
f"{self._title} feaures={self._features} impurity="
f"{self._impurity:.4f}"
)
class Siterator:
@@ -130,6 +153,7 @@ class Stree(BaseEstimator, ClassifierMixin):
degree: int = 3,
gamma="scale",
split_criteria: str = "max_samples",
criterion: str = "gini",
min_samples_split: int = 0,
max_features=None,
):
@@ -144,6 +168,7 @@ class Stree(BaseEstimator, ClassifierMixin):
self.min_samples_split = min_samples_split
self.split_criteria = split_criteria
self.max_features = max_features
self.criterion = criterion
def _more_tags(self) -> dict:
"""Required by sklearn to supply features of the classifier
@@ -251,6 +276,10 @@ class Stree(BaseEstimator, ClassifierMixin):
f"split_criteria has to be min_distance or \
max_samples got ({self.split_criteria})"
)
if self.criterion not in ["gini", "entropy"]:
raise ValueError(
f"criterion must be gini or entropy got({self.criterion})"
)
check_classification_targets(y)
X, y = check_X_y(X, y)
@@ -263,6 +292,7 @@ class Stree(BaseEstimator, ClassifierMixin):
self.depth_ = 0
self.n_features_ = X.shape[1]
self.max_features_ = self._initialize_max_features()
self.criterion_function_ = getattr(self, f"_{self.criterion}")
self.tree_ = self.train(X, y, sample_weight, 1, "root")
self._build_predictor()
return self
@@ -296,12 +326,20 @@ class Stree(BaseEstimator, ClassifierMixin):
return None
if np.unique(y).shape[0] == 1:
# only 1 class => pure dataset
return Snode(None, X, y, title + ", <pure>")
return Snode(
clf=None,
X=X,
y=y,
features=X.shape[1],
impurity=0.0,
title=title + ", <pure>",
)
# Train the model
clf = self._build_clf()
Xs, indices_subset = self._get_subspace(X)
clf.fit(Xs, y, sample_weight=sample_weight)
node = Snode(clf, Xs, y, title)
impurity = self.criterion_function_(y)
node = Snode(clf, X, y, indices_subset, impurity, title)
self.depth_ = max(depth, self.depth_)
down = self._split_criteria(self._distances(node, Xs), node)
X_U, X_D = self._split_array(X, down)
@@ -309,7 +347,14 @@ class Stree(BaseEstimator, ClassifierMixin):
sw_u, sw_d = self._split_array(sample_weight, down)
if X_U is None or X_D is None:
# didn't part anything
return Snode(clf, X, y, title + ", <cgaf>")
return Snode(
clf,
X,
y,
features=X.shape[1],
impurity=impurity,
title=title + ", <cgaf>",
)
node.set_up(self.train(X_U, y_u, sw_u, depth + 1, title + " - Up"))
node.set_down(self.train(X_D, y_d, sw_d, depth + 1, title + " - Down"))
return node
@@ -484,6 +529,17 @@ class Stree(BaseEstimator, ClassifierMixin):
)
return max_features
@staticmethod
def _gini(y: np.array) -> float:
_, count = np.unique(y, return_counts=True)
return 1 - np.sum(np.square(count / np.sum(count)))
@staticmethod
def _entropy(y: np.array) -> float:
_, count = np.unique(y, return_counts=True)
proportion = count / np.sum(count)
return -np.sum(proportion * np.log2(proportion))
def _get_subspace(self, dataset: np.array) -> list:
"""Return the best subspace to make a split
"""

View File

@@ -4,14 +4,14 @@ import unittest
import numpy as np
from stree import Stree, Snode
from .utils import get_dataset
from .utils import load_dataset
class Snode_test(unittest.TestCase):
def __init__(self, *args, **kwargs):
self._random_state = 1
self._clf = Stree(random_state=self._random_state)
self._clf.fit(*get_dataset(self._random_state))
self._clf.fit(*load_dataset(self._random_state))
super().__init__(*args, **kwargs)
@classmethod
@@ -63,27 +63,27 @@ class Snode_test(unittest.TestCase):
run_tree(self._clf.tree_)
def test_make_predictor_on_leaf(self):
test = Snode(None, [1, 2, 3, 4], [1, 0, 1, 1], "test")
test = Snode(None, [1, 2, 3, 4], [1, 0, 1, 1], [], 0.0, "test")
test.make_predictor()
self.assertEqual(1, test._class)
self.assertEqual(0.75, test._belief)
def test_make_predictor_on_not_leaf(self):
test = Snode(None, [1, 2, 3, 4], [1, 0, 1, 1], "test")
test.set_up(Snode(None, [1], [1], "another_test"))
test = Snode(None, [1, 2, 3, 4], [1, 0, 1, 1], [], 0.0, "test")
test.set_up(Snode(None, [1], [1], [], 0.0, "another_test"))
test.make_predictor()
self.assertIsNone(test._class)
self.assertEqual(0, test._belief)
def test_make_predictor_on_leaf_bogus_data(self):
test = Snode(None, [1, 2, 3, 4], [], "test")
test = Snode(None, [1, 2, 3, 4], [], [], 0.0, "test")
test.make_predictor()
self.assertIsNone(test._class)
def test_copy_node(self):
px = [1, 2, 3, 4]
py = [1]
test = Snode(Stree(), px, py, "test")
test = Snode(Stree(), px, py, [], 0.0, "test")
computed = Snode.copy(test)
self.assertListEqual(computed._X, px)
self.assertListEqual(computed._y, py)

View File

@@ -5,7 +5,7 @@ import numpy as np
from sklearn.datasets import load_iris
from stree import Stree, Snode
from .utils import get_dataset
from .utils import load_dataset
class Stree_test(unittest.TestCase):
@@ -64,7 +64,7 @@ class Stree_test(unittest.TestCase):
warnings.filterwarnings("ignore")
for kernel in self._kernels:
clf = Stree(kernel=kernel, random_state=self._random_state)
clf.fit(*get_dataset(self._random_state))
clf.fit(*load_dataset(self._random_state))
self._check_tree(clf.tree_)
def _find_out(
@@ -88,7 +88,7 @@ class Stree_test(unittest.TestCase):
return res
def test_single_prediction(self):
X, y = get_dataset(self._random_state)
X, y = load_dataset(self._random_state)
for kernel in self._kernels:
clf = Stree(kernel=kernel, random_state=self._random_state)
yp = clf.fit(X, y).predict((X[0, :].reshape(-1, X.shape[1])))
@@ -97,14 +97,14 @@ class Stree_test(unittest.TestCase):
def test_multiple_prediction(self):
# First 27 elements the predictions are the same as the truth
num = 27
X, y = get_dataset(self._random_state)
X, y = load_dataset(self._random_state)
for kernel in self._kernels:
clf = Stree(kernel=kernel, random_state=self._random_state)
yp = clf.fit(X, y).predict(X[:num, :])
self.assertListEqual(y[:num].tolist(), yp.tolist())
def test_score(self):
X, y = get_dataset(self._random_state)
X, y = load_dataset(self._random_state)
accuracies = [
0.9506666666666667,
0.9606666666666667,
@@ -123,7 +123,7 @@ class Stree_test(unittest.TestCase):
"""Check if predicting sample by sample gives the same result as
predicting all samples at once
"""
X, y = get_dataset(self._random_state)
X, y = load_dataset(self._random_state)
for kernel in self._kernels:
clf = Stree(kernel=kernel, random_state=self._random_state)
clf.fit(X, y)
@@ -141,22 +141,22 @@ class Stree_test(unittest.TestCase):
"""Check preorder iterator
"""
expected = [
"root",
"root - Down",
"root - Down - Down, <cgaf> - Leaf class=1 belief= 0.975989 counts"
"=(array([0, 1]), array([ 17, 691]))",
"root - Down - Up",
"root feaures=(0, 1, 2) impurity=0.5000",
"root - Down feaures=(0, 1, 2) impurity=0.0671",
"root - Down - Down, <cgaf> - Leaf class=1 belief= 0.975989 "
"impurity=0.0469 counts=(array([0, 1]), array([ 17, 691]))",
"root - Down - Up feaures=(0, 1, 2) impurity=0.3967",
"root - Down - Up - Down, <cgaf> - Leaf class=1 belief= 0.750000 "
"counts=(array([0, 1]), array([1, 3]))",
"impurity=0.3750 counts=(array([0, 1]), array([1, 3]))",
"root - Down - Up - Up, <pure> - Leaf class=0 belief= 1.000000 "
"counts=(array([0]), array([7]))",
"root - Up, <cgaf> - Leaf class=0 belief= 0.928297 counts=(array("
"[0, 1]), array([725, 56]))",
"impurity=0.0000 counts=(array([0]), array([7]))",
"root - Up, <cgaf> - Leaf class=0 belief= 0.928297 impurity=0.1331"
" counts=(array([0, 1]), array([725, 56]))",
]
computed = []
expected_string = ""
clf = Stree(kernel="linear", random_state=self._random_state)
clf.fit(*get_dataset(self._random_state))
clf.fit(*load_dataset(self._random_state))
for node in clf:
computed.append(str(node))
expected_string += str(node) + "\n"
@@ -176,12 +176,12 @@ class Stree_test(unittest.TestCase):
def test_exception_if_C_is_negative(self):
tclf = Stree(C=-1)
with self.assertRaises(ValueError):
tclf.fit(*get_dataset(self._random_state))
tclf.fit(*load_dataset(self._random_state))
def test_exception_if_bogus_split_criteria(self):
tclf = Stree(split_criteria="duck")
with self.assertRaises(ValueError):
tclf.fit(*get_dataset(self._random_state))
tclf.fit(*load_dataset(self._random_state))
def test_check_max_depth_is_positive_or_None(self):
tcl = Stree()
@@ -190,13 +190,13 @@ class Stree_test(unittest.TestCase):
self.assertGreaterEqual(1, tcl.max_depth)
with self.assertRaises(ValueError):
tcl = Stree(max_depth=-1)
tcl.fit(*get_dataset(self._random_state))
tcl.fit(*load_dataset(self._random_state))
def test_check_max_depth(self):
depths = (3, 4)
for depth in depths:
tcl = Stree(random_state=self._random_state, max_depth=depth)
tcl.fit(*get_dataset(self._random_state))
tcl.fit(*load_dataset(self._random_state))
self.assertEqual(depth, tcl.depth_)
def test_unfitted_tree_is_iterable(self):
@@ -230,7 +230,7 @@ class Stree_test(unittest.TestCase):
def test_muticlass_dataset(self):
datasets = {
"Synt": get_dataset(random_state=self._random_state, n_classes=3),
"Synt": load_dataset(random_state=self._random_state, n_classes=3),
"Iris": load_iris(return_X_y=True),
}
outcomes = {
@@ -339,3 +339,24 @@ class Stree_test(unittest.TestCase):
dataset[:, indices].tolist(), computed.tolist()
)
self.assertEqual(expected, len(indices))
def test_bogus_criterion(self):
clf = Stree(criterion="duck")
with self.assertRaises(ValueError):
clf.fit(*load_dataset())
def test_gini(self):
y = [0, 1, 1, 1, 1, 1, 0, 0, 0, 1]
expected = 0.48
self.assertEqual(expected, Stree._gini(y))
clf = Stree(criterion="gini")
clf.fit(*load_dataset())
self.assertEqual(expected, clf.criterion_function_(y))
def test_entropy(self):
y = [0, 1, 1, 1, 1, 1, 0, 0, 0, 1]
expected = 0.9709505944546686
self.assertAlmostEqual(expected, Stree._entropy(y))
clf = Stree(criterion="entropy")
clf.fit(*load_dataset())
self.assertEqual(expected, clf.criterion_function_(y))

View File

@@ -1,7 +1,7 @@
from sklearn.datasets import make_classification
def get_dataset(random_state=0, n_classes=2):
def load_dataset(random_state=0, n_classes=2):
X, y = make_classification(
n_samples=1500,
n_features=3,