mirror of
https://github.com/Doctorado-ML/bayesclass.git
synced 2025-08-15 23:55:57 +00:00
@@ -1,4 +1,4 @@
|
||||
from .bayesclass import TAN, KDB
|
||||
from .bayesclass import TAN, KDB, AODE
|
||||
from ._version import __version__
|
||||
|
||||
__author__ = "Ricardo Montañana Gómez"
|
||||
@@ -6,4 +6,4 @@ __copyright__ = "Copyright 2020-2023, Ricardo Montañana Gómez"
|
||||
__license__ = "MIT License"
|
||||
__author_email__ = "ricardo.montanana@alu.uclm.es"
|
||||
|
||||
__all__ = ["TAN", "KDB", "__version__"]
|
||||
__all__ = ["TAN", "KDB", "AODE", "__version__"]
|
||||
|
@@ -4,7 +4,9 @@ This is a module to be used as a reference for building other modules
|
||||
import random
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from scipy.stats import mode
|
||||
from sklearn.base import ClassifierMixin, BaseEstimator
|
||||
from sklearn.ensemble import BaseEnsemble
|
||||
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
|
||||
from sklearn.utils.multiclass import unique_labels
|
||||
from sklearn.feature_selection import mutual_info_classif
|
||||
@@ -37,6 +39,29 @@ class BayesBase(BaseEstimator, ClassifierMixin):
|
||||
"""To keep compatiblity with the benchmark platform"""
|
||||
return 0, 0
|
||||
|
||||
def _check_params_fit(self, X, y, expected_args, kwargs):
|
||||
"""Check the common parameters passed to fit"""
|
||||
# Check that X and y have correct shape
|
||||
X, y = check_X_y(X, y)
|
||||
# Store the classes seen during fit
|
||||
self.classes_ = unique_labels(y)
|
||||
self.n_classes_ = self.classes_.shape[0]
|
||||
# Default values
|
||||
self.class_name_ = "class"
|
||||
self.features_ = [f"feature_{i}" for i in range(X.shape[1])]
|
||||
for key, value in kwargs.items():
|
||||
if key in expected_args:
|
||||
setattr(self, f"{key}_", value)
|
||||
else:
|
||||
raise ValueError(f"Unexpected argument: {key}")
|
||||
if self.random_state is not None:
|
||||
random.seed(self.random_state)
|
||||
if len(self.features_) != X.shape[1]:
|
||||
raise ValueError(
|
||||
"Number of features does not match the number of columns in X"
|
||||
)
|
||||
return X, y
|
||||
|
||||
def fit(self, X, y, **kwargs):
|
||||
"""A reference implementation of a fitting function for a classifier.
|
||||
|
||||
@@ -75,7 +100,7 @@ class BayesBase(BaseEstimator, ClassifierMixin):
|
||||
>>> model.fit(train_data, train_y, features=features, class_name='E')
|
||||
TAN(random_state=17)
|
||||
"""
|
||||
X_, y_ = self._check_params_fit(X, y, kwargs)
|
||||
X_, y_ = self._check_params(X, y, kwargs)
|
||||
# Store the information needed to build the model
|
||||
self.X_ = X_
|
||||
self.y_ = y_
|
||||
@@ -89,6 +114,16 @@ class BayesBase(BaseEstimator, ClassifierMixin):
|
||||
# Return the classifier
|
||||
return self
|
||||
|
||||
def _train(self):
|
||||
self.model_ = BayesianNetwork(
|
||||
self.dag_.edges(), show_progress=self.show_progress
|
||||
)
|
||||
self.model_.fit(
|
||||
self.dataset_,
|
||||
estimator=BayesianEstimator,
|
||||
prior_type="K2",
|
||||
)
|
||||
|
||||
def predict(self, X):
|
||||
"""A reference implementation of a prediction for a classifier.
|
||||
|
||||
@@ -193,30 +228,12 @@ class TAN(BayesBase):
|
||||
show_progress=show_progress, random_state=random_state
|
||||
)
|
||||
|
||||
def _check_params_fit(self, X, y, kwargs):
|
||||
"""Check the parameters passed to fit"""
|
||||
# Check that X and y have correct shape
|
||||
X, y = check_X_y(X, y)
|
||||
# Store the classes seen during fit
|
||||
self.classes_ = unique_labels(y)
|
||||
# Default values
|
||||
self.class_name_ = "class"
|
||||
self.features_ = [f"feature_{i}" for i in range(X.shape[1])]
|
||||
def _check_params(self, X, y, kwargs):
|
||||
self.head_ = 0
|
||||
expected_args = ["class_name", "features", "head"]
|
||||
for key, value in kwargs.items():
|
||||
if key in expected_args:
|
||||
setattr(self, f"{key}_", value)
|
||||
else:
|
||||
raise ValueError(f"Unexpected argument: {key}")
|
||||
if self.random_state is not None:
|
||||
random.seed(self.random_state)
|
||||
X, y = self._check_params_fit(X, y, expected_args, kwargs)
|
||||
if self.head_ == "random":
|
||||
self.head_ = random.randint(0, len(self.features_) - 1)
|
||||
if len(self.features_) != X.shape[1]:
|
||||
raise ValueError(
|
||||
"Number of features does not match the number of columns in X"
|
||||
)
|
||||
if self.head_ is not None and self.head_ >= len(self.features_):
|
||||
raise ValueError("Head index out of range")
|
||||
return X, y
|
||||
@@ -229,16 +246,6 @@ class TAN(BayesBase):
|
||||
show_progress=self.show_progress,
|
||||
)
|
||||
|
||||
def _train(self):
|
||||
self.model_ = BayesianNetwork(
|
||||
self.dag_.edges(), show_progress=self.show_progress
|
||||
)
|
||||
self.model_.fit(
|
||||
self.dataset_,
|
||||
estimator=BayesianEstimator,
|
||||
prior_type="K2",
|
||||
)
|
||||
|
||||
|
||||
class KDB(BayesBase):
|
||||
def __init__(self, k, theta=0.03, show_progress=False, random_state=None):
|
||||
@@ -248,29 +255,9 @@ class KDB(BayesBase):
|
||||
show_progress=show_progress, random_state=random_state
|
||||
)
|
||||
|
||||
def _check_params_fit(self, X, y, kwargs):
|
||||
"""Check the parameters passed to fit"""
|
||||
# Check that X and y have correct shape
|
||||
X, y = check_X_y(X, y)
|
||||
# Store the classes seen during fit
|
||||
self.classes_ = unique_labels(y)
|
||||
# Default values
|
||||
self.class_name_ = "class"
|
||||
self.features_ = [f"feature_{i}" for i in range(X.shape[1])]
|
||||
self.head_ = 0
|
||||
def _check_params(self, X, y, kwargs):
|
||||
expected_args = ["class_name", "features"]
|
||||
for key, value in kwargs.items():
|
||||
if key in expected_args:
|
||||
setattr(self, f"{key}_", value)
|
||||
else:
|
||||
raise ValueError(f"Unexpected argument: {key}")
|
||||
if self.random_state is not None:
|
||||
random.seed(self.random_state)
|
||||
if len(self.features_) != X.shape[1]:
|
||||
raise ValueError(
|
||||
"Number of features does not match the number of columns in X"
|
||||
)
|
||||
return X, y
|
||||
return self._check_params_fit(X, y, expected_args, kwargs)
|
||||
|
||||
def _build(self):
|
||||
"""
|
||||
@@ -335,12 +322,55 @@ class KDB(BayesBase):
|
||||
S_nodes.append(idx)
|
||||
self.dag_ = dag
|
||||
|
||||
|
||||
class AODE(BayesBase, BaseEnsemble):
|
||||
def __init__(self, show_progress=False, random_state=None):
|
||||
super().__init__(
|
||||
show_progress=show_progress, random_state=random_state
|
||||
)
|
||||
|
||||
def _check_params(self, X, y, kwargs):
|
||||
expected_args = ["class_name", "features"]
|
||||
return self._check_params_fit(X, y, expected_args, kwargs)
|
||||
|
||||
def _build(self):
|
||||
|
||||
self.dag_ = None
|
||||
|
||||
def _train(self):
|
||||
self.model_ = BayesianNetwork(
|
||||
self.dag_.edges(), show_progress=self.show_progress
|
||||
)
|
||||
self.model_.fit(
|
||||
self.dataset_,
|
||||
estimator=BayesianEstimator,
|
||||
prior_type="K2",
|
||||
)
|
||||
"""Build SPODE estimators (Super Parent One Dependent Estimator)"""
|
||||
self.models_ = []
|
||||
class_edges = [(self.class_name_, f) for f in self.features_]
|
||||
for idx in range(len(self.features_)):
|
||||
feature_edges = [
|
||||
(self.features_[idx], f)
|
||||
for f in self.features_
|
||||
if f != self.features_[idx]
|
||||
]
|
||||
feature_edges.extend(class_edges)
|
||||
model = BayesianNetwork(
|
||||
feature_edges, show_progress=self.show_progress
|
||||
)
|
||||
model.fit(
|
||||
self.dataset_,
|
||||
estimator=BayesianEstimator,
|
||||
prior_type="K2",
|
||||
)
|
||||
self.models_.append(model)
|
||||
|
||||
def plot(self, title=""):
|
||||
for idx, model in enumerate(self.models_):
|
||||
self.model_ = model
|
||||
super().plot(title=f"{idx} {title}")
|
||||
|
||||
def predict(self, X: np.ndarray) -> np.ndarray:
|
||||
check_is_fitted(self, ["X_", "y_", "fitted_"])
|
||||
# Input validation
|
||||
X = self._validate_data(X, reset=False)
|
||||
n_samples = X.shape[0]
|
||||
n_estimators = len(self.models_)
|
||||
result = np.empty((n_samples, n_estimators))
|
||||
dataset = pd.DataFrame(X, columns=self.features_, dtype="int16")
|
||||
for index, model in enumerate(self.models_):
|
||||
result[:, index] = model.predict(dataset).values.ravel()
|
||||
return mode(result, axis=1).mode.ravel()
|
||||
|
79
bayesclass/tests/test_AODE.py
Normal file
79
bayesclass/tests/test_AODE.py
Normal file
@@ -0,0 +1,79 @@
|
||||
import pytest
|
||||
import numpy as np
|
||||
from sklearn.datasets import load_iris
|
||||
from sklearn.preprocessing import KBinsDiscretizer
|
||||
|
||||
|
||||
from bayesclass import AODE
|
||||
from .._version import __version__
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def data():
|
||||
X, y = load_iris(return_X_y=True)
|
||||
enc = KBinsDiscretizer(encode="ordinal")
|
||||
return enc.fit_transform(X), y
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def clf():
|
||||
return AODE()
|
||||
|
||||
|
||||
def test_AODE_default_hyperparameters(data, clf):
|
||||
# Test default values of hyperparameters
|
||||
assert not clf.show_progress
|
||||
assert clf.random_state is None
|
||||
clf = AODE(show_progress=True, random_state=17)
|
||||
assert clf.show_progress
|
||||
assert clf.random_state == 17
|
||||
clf.fit(*data)
|
||||
assert clf.class_name_ == "class"
|
||||
assert clf.features_ == [
|
||||
"feature_0",
|
||||
"feature_1",
|
||||
"feature_2",
|
||||
"feature_3",
|
||||
]
|
||||
|
||||
|
||||
def test_AODE_version(clf):
|
||||
"""Check AODE version."""
|
||||
assert __version__ == clf.version()
|
||||
|
||||
|
||||
def test_AODE_nodes_leaves(clf):
|
||||
assert clf.nodes_leaves() == (0, 0)
|
||||
|
||||
|
||||
def test_AODE_classifier(data, clf):
|
||||
clf.fit(*data)
|
||||
attribs = ["classes_", "X_", "y_", "features_", "class_name_"]
|
||||
for attr in attribs:
|
||||
assert hasattr(clf, attr)
|
||||
X = data[0]
|
||||
y = data[1]
|
||||
y_pred = clf.predict(X)
|
||||
assert y_pred.shape == (X.shape[0],)
|
||||
assert sum(y == y_pred) == 147
|
||||
|
||||
|
||||
def test_AODE_wrong_num_features(data, clf):
|
||||
with pytest.raises(
|
||||
ValueError,
|
||||
match="Number of features does not match the number of columns in X",
|
||||
):
|
||||
clf.fit(*data, features=["feature_1", "feature_2"])
|
||||
|
||||
|
||||
def test_AODE_wrong_hyperparam(data, clf):
|
||||
with pytest.raises(ValueError, match="Unexpected argument: wrong_param"):
|
||||
clf.fit(*data, wrong_param="wrong_param")
|
||||
|
||||
|
||||
def test_AODE_error_size_predict(data, clf):
|
||||
X, y = data
|
||||
clf.fit(X, y)
|
||||
with pytest.raises(ValueError):
|
||||
X_diff_size = np.ones((10, X.shape[1] + 1))
|
||||
clf.predict(X_diff_size)
|
@@ -30,6 +30,7 @@ def test_KDB_default_hyperparameters(data, clf):
|
||||
clf = KDB(show_progress=True, random_state=17, k=3)
|
||||
assert clf.show_progress
|
||||
assert clf.random_state == 17
|
||||
assert clf.k == 3
|
||||
clf.fit(*data)
|
||||
assert clf.class_name_ == "class"
|
||||
assert clf.features_ == [
|
||||
@@ -41,7 +42,7 @@ def test_KDB_default_hyperparameters(data, clf):
|
||||
|
||||
|
||||
def test_KDB_version(clf):
|
||||
"""Check TAN version."""
|
||||
"""Check KDB version."""
|
||||
assert __version__ == clf.version()
|
||||
|
||||
|
||||
@@ -58,7 +59,7 @@ def test_KDB_classifier(data, clf):
|
||||
y = data[1]
|
||||
y_pred = clf.predict(X)
|
||||
assert y_pred.shape == (X.shape[0],)
|
||||
assert sum(y == y_pred) == 147
|
||||
assert sum(y == y_pred) == 148
|
||||
|
||||
|
||||
@image_comparison(
|
||||
|
@@ -2,12 +2,13 @@ import pytest
|
||||
|
||||
from sklearn.utils.estimator_checks import check_estimator
|
||||
|
||||
from bayesclass import TAN
|
||||
from bayesclass import TAN, KDB, AODE
|
||||
|
||||
|
||||
@pytest.mark.parametrize("estimator", [TAN()])
|
||||
@pytest.mark.parametrize("estimator", [TAN(), KDB(k=2), AODE()])
|
||||
# @pytest.mark.parametrize("estimator", [AODE()])
|
||||
def test_all_estimators(estimator):
|
||||
i = 0
|
||||
for estimator, test in check_estimator(estimator, generate_only=True):
|
||||
print(i := i + 1, test, "classes_")
|
||||
print(i := i + 1, test)
|
||||
# test(estimator)
|
||||
|
Reference in New Issue
Block a user