First implementation

This commit is contained in:
2022-11-18 23:09:11 +01:00
parent 5a6ffe0651
commit 0a7c56f339
5 changed files with 184 additions and 68 deletions

View File

@@ -1,4 +1,4 @@
from .bayesclass import TAN, KDB from .bayesclass import TAN, KDB, AODE
from ._version import __version__ from ._version import __version__
__author__ = "Ricardo Montañana Gómez" __author__ = "Ricardo Montañana Gómez"
@@ -6,4 +6,4 @@ __copyright__ = "Copyright 2020-2023, Ricardo Montañana Gómez"
__license__ = "MIT License" __license__ = "MIT License"
__author_email__ = "ricardo.montanana@alu.uclm.es" __author_email__ = "ricardo.montanana@alu.uclm.es"
__all__ = ["TAN", "KDB", "__version__"] __all__ = ["TAN", "KDB", "AODE", "__version__"]

View File

@@ -4,7 +4,9 @@ This is a module to be used as a reference for building other modules
import random import random
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from scipy.stats import mode
from sklearn.base import ClassifierMixin, BaseEstimator from sklearn.base import ClassifierMixin, BaseEstimator
from sklearn.ensemble import BaseEnsemble
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.utils.multiclass import unique_labels from sklearn.utils.multiclass import unique_labels
from sklearn.feature_selection import mutual_info_classif from sklearn.feature_selection import mutual_info_classif
@@ -37,6 +39,29 @@ class BayesBase(BaseEstimator, ClassifierMixin):
"""To keep compatiblity with the benchmark platform""" """To keep compatiblity with the benchmark platform"""
return 0, 0 return 0, 0
def _check_params_fit(self, X, y, expected_args, kwargs):
"""Check the common parameters passed to fit"""
# Check that X and y have correct shape
X, y = check_X_y(X, y)
# Store the classes seen during fit
self.classes_ = unique_labels(y)
self.n_classes_ = self.classes_.shape[0]
# Default values
self.class_name_ = "class"
self.features_ = [f"feature_{i}" for i in range(X.shape[1])]
for key, value in kwargs.items():
if key in expected_args:
setattr(self, f"{key}_", value)
else:
raise ValueError(f"Unexpected argument: {key}")
if self.random_state is not None:
random.seed(self.random_state)
if len(self.features_) != X.shape[1]:
raise ValueError(
"Number of features does not match the number of columns in X"
)
return X, y
def fit(self, X, y, **kwargs): def fit(self, X, y, **kwargs):
"""A reference implementation of a fitting function for a classifier. """A reference implementation of a fitting function for a classifier.
@@ -75,7 +100,7 @@ class BayesBase(BaseEstimator, ClassifierMixin):
>>> model.fit(train_data, train_y, features=features, class_name='E') >>> model.fit(train_data, train_y, features=features, class_name='E')
TAN(random_state=17) TAN(random_state=17)
""" """
X_, y_ = self._check_params_fit(X, y, kwargs) X_, y_ = self._check_params(X, y, kwargs)
# Store the information needed to build the model # Store the information needed to build the model
self.X_ = X_ self.X_ = X_
self.y_ = y_ self.y_ = y_
@@ -89,6 +114,16 @@ class BayesBase(BaseEstimator, ClassifierMixin):
# Return the classifier # Return the classifier
return self return self
def _train(self):
self.model_ = BayesianNetwork(
self.dag_.edges(), show_progress=self.show_progress
)
self.model_.fit(
self.dataset_,
estimator=BayesianEstimator,
prior_type="K2",
)
def predict(self, X): def predict(self, X):
"""A reference implementation of a prediction for a classifier. """A reference implementation of a prediction for a classifier.
@@ -193,30 +228,12 @@ class TAN(BayesBase):
show_progress=show_progress, random_state=random_state show_progress=show_progress, random_state=random_state
) )
def _check_params_fit(self, X, y, kwargs): def _check_params(self, X, y, kwargs):
"""Check the parameters passed to fit"""
# Check that X and y have correct shape
X, y = check_X_y(X, y)
# Store the classes seen during fit
self.classes_ = unique_labels(y)
# Default values
self.class_name_ = "class"
self.features_ = [f"feature_{i}" for i in range(X.shape[1])]
self.head_ = 0 self.head_ = 0
expected_args = ["class_name", "features", "head"] expected_args = ["class_name", "features", "head"]
for key, value in kwargs.items(): X, y = self._check_params_fit(X, y, expected_args, kwargs)
if key in expected_args:
setattr(self, f"{key}_", value)
else:
raise ValueError(f"Unexpected argument: {key}")
if self.random_state is not None:
random.seed(self.random_state)
if self.head_ == "random": if self.head_ == "random":
self.head_ = random.randint(0, len(self.features_) - 1) self.head_ = random.randint(0, len(self.features_) - 1)
if len(self.features_) != X.shape[1]:
raise ValueError(
"Number of features does not match the number of columns in X"
)
if self.head_ is not None and self.head_ >= len(self.features_): if self.head_ is not None and self.head_ >= len(self.features_):
raise ValueError("Head index out of range") raise ValueError("Head index out of range")
return X, y return X, y
@@ -229,16 +246,6 @@ class TAN(BayesBase):
show_progress=self.show_progress, show_progress=self.show_progress,
) )
def _train(self):
self.model_ = BayesianNetwork(
self.dag_.edges(), show_progress=self.show_progress
)
self.model_.fit(
self.dataset_,
estimator=BayesianEstimator,
prior_type="K2",
)
class KDB(BayesBase): class KDB(BayesBase):
def __init__(self, k, theta=0.03, show_progress=False, random_state=None): def __init__(self, k, theta=0.03, show_progress=False, random_state=None):
@@ -248,29 +255,9 @@ class KDB(BayesBase):
show_progress=show_progress, random_state=random_state show_progress=show_progress, random_state=random_state
) )
def _check_params_fit(self, X, y, kwargs): def _check_params(self, X, y, kwargs):
"""Check the parameters passed to fit"""
# Check that X and y have correct shape
X, y = check_X_y(X, y)
# Store the classes seen during fit
self.classes_ = unique_labels(y)
# Default values
self.class_name_ = "class"
self.features_ = [f"feature_{i}" for i in range(X.shape[1])]
self.head_ = 0
expected_args = ["class_name", "features"] expected_args = ["class_name", "features"]
for key, value in kwargs.items(): return self._check_params_fit(X, y, expected_args, kwargs)
if key in expected_args:
setattr(self, f"{key}_", value)
else:
raise ValueError(f"Unexpected argument: {key}")
if self.random_state is not None:
random.seed(self.random_state)
if len(self.features_) != X.shape[1]:
raise ValueError(
"Number of features does not match the number of columns in X"
)
return X, y
def _build(self): def _build(self):
""" """
@@ -335,12 +322,56 @@ class KDB(BayesBase):
S_nodes.append(idx) S_nodes.append(idx)
self.dag_ = dag self.dag_ = dag
class AODE(BayesBase, BaseEnsemble):
def __init__(self, min_data=30, show_progress=False, random_state=None):
super().__init__(
show_progress=show_progress, random_state=random_state
)
self.min_data = min_data
def _check_params(self, X, y, kwargs):
expected_args = ["class_name", "features"]
return self._check_params_fit(X, y, expected_args, kwargs)
def _build(self):
self.dag_ = None
def _train(self): def _train(self):
self.model_ = BayesianNetwork( """Build SPODE estimators (Super Parent One Dependent Estimator)"""
self.dag_.edges(), show_progress=self.show_progress self.models_ = []
) class_edges = [(self.class_name_, f) for f in self.features_]
self.model_.fit( for idx in range(len(self.features_)):
self.dataset_, feature_edges = [
estimator=BayesianEstimator, (self.features_[idx], f)
prior_type="K2", for f in self.features_
) if f != self.features_[idx]
]
feature_edges.extend(class_edges)
model = BayesianNetwork(
feature_edges, show_progress=self.show_progress
)
model.fit(
self.dataset_,
estimator=BayesianEstimator,
prior_type="K2",
)
self.models_.append(model)
def plot(self, title=""):
for idx, model in enumerate(self.models_):
self.model_ = model
super().plot(title=f"{idx} {title}")
def predict(self, X: np.ndarray) -> np.ndarray:
check_is_fitted(self, ["X_", "y_", "fitted_"])
# Input validation
X = self._validate_data(X, reset=False)
n_samples = X.shape[0]
n_estimators = len(self.models_)
result = np.empty((n_samples, n_estimators))
dataset = pd.DataFrame(X, columns=self.features_, dtype="int16")
for index, model in enumerate(self.models_):
result[:, index] = model.predict(dataset).values.ravel()
return mode(result, axis=1).mode.ravel()

View File

@@ -0,0 +1,83 @@
import pytest
import numpy as np
from sklearn.datasets import load_iris
from sklearn.preprocessing import KBinsDiscretizer
from matplotlib.testing.decorators import image_comparison
from matplotlib.testing.conftest import mpl_test_settings
from bayesclass import AODE
from .._version import __version__
@pytest.fixture
def data():
X, y = load_iris(return_X_y=True)
enc = KBinsDiscretizer(encode="ordinal")
return enc.fit_transform(X), y
@pytest.fixture
def clf():
return AODE()
def test_AODE_default_hyperparameters(data, clf):
# Test default values of hyperparameters
assert not clf.show_progress
assert clf.random_state is None
assert clf.min_data == 30
clf = AODE(show_progress=True, random_state=17, min_data=3)
assert clf.show_progress
assert clf.random_state == 17
assert clf.min_data == 3
clf.fit(*data)
assert clf.class_name_ == "class"
assert clf.features_ == [
"feature_0",
"feature_1",
"feature_2",
"feature_3",
]
def test_AODE_version(clf):
"""Check AODE version."""
assert __version__ == clf.version()
def test_AODE_nodes_leaves(clf):
assert clf.nodes_leaves() == (0, 0)
def test_AODE_classifier(data, clf):
clf.fit(*data)
attribs = ["classes_", "X_", "y_", "features_", "class_name_"]
for attr in attribs:
assert hasattr(clf, attr)
X = data[0]
y = data[1]
y_pred = clf.predict(X)
assert y_pred.shape == (X.shape[0],)
assert sum(y == y_pred) == 147
def test_AODE_wrong_num_features(data, clf):
with pytest.raises(
ValueError,
match="Number of features does not match the number of columns in X",
):
clf.fit(*data, features=["feature_1", "feature_2"])
def test_AODE_wrong_hyperparam(data, clf):
with pytest.raises(ValueError, match="Unexpected argument: wrong_param"):
clf.fit(*data, wrong_param="wrong_param")
def test_AODE_error_size_predict(data, clf):
X, y = data
clf.fit(X, y)
with pytest.raises(ValueError):
X_diff_size = np.ones((10, X.shape[1] + 1))
clf.predict(X_diff_size)

View File

@@ -30,6 +30,7 @@ def test_KDB_default_hyperparameters(data, clf):
clf = KDB(show_progress=True, random_state=17, k=3) clf = KDB(show_progress=True, random_state=17, k=3)
assert clf.show_progress assert clf.show_progress
assert clf.random_state == 17 assert clf.random_state == 17
assert clf.k == 3
clf.fit(*data) clf.fit(*data)
assert clf.class_name_ == "class" assert clf.class_name_ == "class"
assert clf.features_ == [ assert clf.features_ == [
@@ -41,7 +42,7 @@ def test_KDB_default_hyperparameters(data, clf):
def test_KDB_version(clf): def test_KDB_version(clf):
"""Check TAN version.""" """Check KDB version."""
assert __version__ == clf.version() assert __version__ == clf.version()
@@ -58,7 +59,7 @@ def test_KDB_classifier(data, clf):
y = data[1] y = data[1]
y_pred = clf.predict(X) y_pred = clf.predict(X)
assert y_pred.shape == (X.shape[0],) assert y_pred.shape == (X.shape[0],)
assert sum(y == y_pred) == 147 assert sum(y == y_pred) == 148
@image_comparison( @image_comparison(

View File

@@ -2,12 +2,13 @@ import pytest
from sklearn.utils.estimator_checks import check_estimator from sklearn.utils.estimator_checks import check_estimator
from bayesclass import TAN from bayesclass import TAN, KDB, AODE
@pytest.mark.parametrize("estimator", [TAN()]) @pytest.mark.parametrize("estimator", [TAN(), KDB(k=2), AODE()])
# @pytest.mark.parametrize("estimator", [AODE()])
def test_all_estimators(estimator): def test_all_estimators(estimator):
i = 0 i = 0
for estimator, test in check_estimator(estimator, generate_only=True): for estimator, test in check_estimator(estimator, generate_only=True):
print(i := i + 1, test, "classes_") print(i := i + 1, test)
# test(estimator) # test(estimator)