Merge pull request #3 from Doctorado-ML/localdiscretization

Localdiscretization
This commit is contained in:
Ricardo Montañana Gómez
2023-05-15 11:42:52 +02:00
committed by GitHub
17 changed files with 942 additions and 208 deletions

View File

@@ -16,4 +16,6 @@ __all__ = [
"TAN",
"KDB",
"AODE",
"KDBNew",
"AODENew",
]

View File

@@ -3,7 +3,7 @@ import warnings
import numpy as np
import pandas as pd
from scipy.stats import mode
from sklearn.base import ClassifierMixin, BaseEstimator
from sklearn.base import clone, ClassifierMixin, BaseEstimator
from sklearn.ensemble import BaseEnsemble
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.utils.multiclass import unique_labels
@@ -12,9 +12,14 @@ import networkx as nx
from pgmpy.estimators import TreeSearch, BayesianEstimator
from pgmpy.models import BayesianNetwork
import matplotlib.pyplot as plt
from fimdlp.mdlp import FImdlp
from ._version import __version__
def default_feature_names(num_features):
return [f"feature_{i}" for i in range(num_features)]
class BayesBase(BaseEstimator, ClassifierMixin):
def __init__(self, random_state, show_progress):
self.random_state = random_state
@@ -38,6 +43,16 @@ class BayesBase(BaseEstimator, ClassifierMixin):
return len(self.dag_), len(self.dag_.edges())
return 0, 0
@staticmethod
def default_class_name():
return "class"
def build_dataset(self):
self.dataset_ = pd.DataFrame(
self.X_, columns=self.feature_names_in_, dtype=np.int32
)
self.dataset_[self.class_name_] = self.y_
def _check_params_fit(self, X, y, expected_args, kwargs):
"""Check the common parameters passed to fit"""
# Check that X and y have correct shape
@@ -47,14 +62,18 @@ class BayesBase(BaseEstimator, ClassifierMixin):
self.classes_ = unique_labels(y)
self.n_classes_ = self.classes_.shape[0]
# Default values
self.class_name_ = "class"
self.features_ = [f"feature_{i}" for i in range(X.shape[1])]
self.class_name_ = self.default_class_name()
self.features_ = default_feature_names(X.shape[1])
for key, value in kwargs.items():
if key in expected_args:
setattr(self, f"{key}_", value)
else:
raise ValueError(f"Unexpected argument: {key}")
self.feature_names_in_ = self.features_
# used for local discretization
self.indexed_features_ = {
feature: i for i, feature in enumerate(self.features_)
}
if self.random_state is not None:
random.seed(self.random_state)
if len(self.feature_names_in_) != X.shape[1]:
@@ -75,7 +94,7 @@ class BayesBase(BaseEstimator, ClassifierMixin):
return self.states_
def fit(self, X, y, **kwargs):
"""A reference implementation of a fitting function for a classifier.
"""Fit classifier
Parameters
----------
@@ -116,10 +135,7 @@ class BayesBase(BaseEstimator, ClassifierMixin):
# Store the information needed to build the model
self.X_ = X_
self.y_ = y_
self.dataset_ = pd.DataFrame(
self.X_, columns=self.feature_names_in_, dtype=np.int32
)
self.dataset_[self.class_name_] = self.y_
self.build_dataset()
# Build the DAG
self._build()
# Train the model
@@ -130,6 +146,9 @@ class BayesBase(BaseEstimator, ClassifierMixin):
# Return the classifier
return self
def _build(self):
...
def _train(self, kwargs):
self.model_ = BayesianNetwork(
self.dag_.edges(), show_progress=self.show_progress
@@ -190,7 +209,6 @@ class BayesBase(BaseEstimator, ClassifierMixin):
"""
# Check is fit had been called
check_is_fitted(self, ["X_", "y_", "fitted_"])
# Input validation
X = check_array(X)
dataset = pd.DataFrame(
@@ -260,37 +278,38 @@ class TAN(BayesBase):
return X, y
def _build(self):
# est = TreeSearch(self.dataset_,
# root_node=self.feature_names_in_[self.head_])
# self.dag_ = est.estimate(
# estimator_type="tan",
# class_node=self.class_name_,
# show_progress=self.show_progress,
# )
est = TreeSearch(
self.dataset_, root_node=self.feature_names_in_[self.head_]
)
self.dag_ = est.estimate(
estimator_type="tan",
class_node=self.class_name_,
show_progress=self.show_progress,
)
# Code taken from pgmpy
n_jobs = -1
weights = TreeSearch._get_conditional_weights(
self.dataset_,
self.class_name_,
"mutual_info",
n_jobs,
self.show_progress,
)
# Step 4.2: Construct chow-liu DAG on {data.columns - class_node}
class_node_idx = np.where(self.dataset_.columns == self.class_name_)[
0
][0]
weights = np.delete(weights, class_node_idx, axis=0)
weights = np.delete(weights, class_node_idx, axis=1)
reduced_columns = np.delete(self.dataset_.columns, class_node_idx)
D = TreeSearch._create_tree_and_dag(
weights, reduced_columns, self.feature_names_in_[self.head_]
)
# Step 4.3: Add edges from class_node to all other nodes.
D.add_edges_from(
[(self.class_name_, node) for node in reduced_columns]
)
self.dag_ = D
# n_jobs = -1
# weights = TreeSearch._get_conditional_weights(
# self.dataset_,
# self.class_name_,
# "mutual_info",
# n_jobs,
# self.show_progress,
# )
# # Step 4.2: Construct chow-liu DAG on {data.columns - class_node}
# class_node_idx = np.where(self.dataset_.columns == self.class_name_)[
# 0
# ][0]
# weights = np.delete(weights, class_node_idx, axis=0)
# weights = np.delete(weights, class_node_idx, axis=1)
# reduced_columns = np.delete(self.dataset_.columns, class_node_idx)
# D = TreeSearch._create_tree_and_dag(
# weights, reduced_columns, self.feature_names_in_[self.head_]
# )
# # Step 4.3: Add edges from class_node to all other nodes.
# D.add_edges_from(
# [(self.class_name_, node) for node in reduced_columns]
# )
# self.dag_ = D
class KDB(BayesBase):
@@ -323,7 +342,7 @@ class KDB(BayesBase):
# Loops are not allowed
pass
cond_w[idx, max_minfo] = -1
exit_cond = num == n_edges or np.all(cond_w[idx, :] <= 0)
exit_cond = num == n_edges or np.all(cond_w[idx, :] <= self.theta)
def _build(self):
"""
@@ -345,7 +364,6 @@ class KDB(BayesBase):
Compute the conditional probabilility infered by the structure of BN by
using counts from DB, and output BN.
"""
# 1. get the mutual information between each feature and the class
mutual = mutual_info_classif(self.X_, self.y_, discrete_features=True)
# 2. symmetric matrix where each element represents I(X, Y| class_node)
@@ -354,42 +372,100 @@ class KDB(BayesBase):
)._get_conditional_weights(
self.dataset_, self.class_name_, show_progress=self.show_progress
)
# 3.
# 3. Let the used variable list, S, be empty.
S_nodes = []
# 4.
# 4. Let the BN being constructed, BN, begin with a single class node
dag = BayesianNetwork()
dag.add_node(self.class_name_) # , state_names=self.classes_)
# 5. 5.1
# 5. Repeat until S includes all domain features
# 5.1 Select feature Xmax which is not in S and has the largest value
for idx in np.argsort(mutual):
# 5.2
# 5.2 Add a node to BN representing Xmax.
feature = self.feature_names_in_[idx]
dag.add_node(feature)
# 5.3
# 5.3 Add an arc from C to Xmax in BN.
dag.add_edge(self.class_name_, feature)
# 5.4
# 5.4 Add m = min(lSl,/c) arcs from m distinct features Xj in S
self._add_m_edges(dag, idx, S_nodes, conditional_weights)
# 5.5
# 5.5 Add Xmax to S.
S_nodes.append(idx)
self.dag_ = dag
class AODE(BayesBase, BaseEnsemble):
def __init__(self, show_progress=False, random_state=None):
super().__init__(
show_progress=show_progress, random_state=random_state
)
def build_spodes(features, class_name):
"""Build SPODE estimators (Super Parent One Dependent Estimator)"""
class_edges = [(class_name, f) for f in features]
for idx in range(len(features)):
feature_edges = [
(features[idx], f) for f in features if f != features[idx]
]
feature_edges.extend(class_edges)
model = BayesianNetwork(feature_edges, show_progress=False)
yield model
class SPODE(BayesBase):
def _check_params(self, X, y, kwargs):
expected_args = ["class_name", "features", "state_names"]
return self._check_params_fit(X, y, expected_args, kwargs)
def nodes_edges(self):
nodes = 0
edges = 0
class AODE(ClassifierMixin, BaseEnsemble):
def __init__(
self,
show_progress=False,
random_state=None,
estimator=None,
):
self.show_progress = show_progress
self.random_state = random_state
super().__init__(estimator=estimator)
def _validate_estimator(self) -> None:
"""Check the estimator and set the estimator_ attribute."""
super()._validate_estimator(
default=SPODE(
random_state=self.random_state,
show_progress=self.show_progress,
)
)
def fit(self, X, y, **kwargs):
self.n_features_in_ = X.shape[1]
self.feature_names_in_ = kwargs.get(
"features", default_feature_names(self.n_features_in_)
)
self.class_name_ = kwargs.get("class_name", "class")
# build estimator
self._validate_estimator()
self.X_ = X
self.y_ = y
self.estimators_ = []
self._train(kwargs)
# To keep compatiblity with the benchmark platform
self.fitted_ = True
self.nodes_leaves = self.nodes_edges
return self
def _train(self, kwargs):
for dag in build_spodes(self.feature_names_in_, self.class_name_):
estimator = clone(self.estimator_)
estimator.dag_ = estimator.model_ = dag
estimator.fit(self.X_, self.y_, **kwargs)
self.estimators_.append(estimator)
def predict(self, X: np.ndarray) -> np.ndarray:
n_samples = X.shape[0]
n_estimators = len(self.estimators_)
result = np.empty((n_samples, n_estimators))
for index, estimator in enumerate(self.estimators_):
result[:, index] = estimator.predict(X)
return mode(result, axis=1, keepdims=False).mode.ravel()
def version(self):
if hasattr(self, "fitted_"):
nodes = sum([len(x) for x in self.models_])
edges = sum([len(x.edges()) for x in self.models_])
return nodes, edges
return self.estimator_.version()
return SPODE(None, False).version()
@property
def states_(self):
@@ -397,54 +473,293 @@ class AODE(BayesBase, BaseEnsemble):
return sum(
[
len(item)
for model in self.models_
for _, item in model.states.items()
for model in self.estimators_
for _, item in model.model_.states.items()
]
) / len(self.models_)
) / len(self.estimators_)
return 0
def _build(self):
self.dag_ = None
@property
def depth_(self):
return self.states_
def _train(self, kwargs):
"""Build SPODE estimators (Super Parent One Dependent Estimator)"""
self.models_ = []
class_edges = [(self.class_name_, f) for f in self.feature_names_in_]
states = dict(state_names=kwargs.pop("state_names", []))
for idx in range(self.n_features_in_):
feature_edges = [
(self.feature_names_in_[idx], f)
for f in self.feature_names_in_
if f != self.feature_names_in_[idx]
]
feature_edges.extend(class_edges)
model = BayesianNetwork(
feature_edges, show_progress=self.show_progress
)
model.fit(
self.dataset_,
estimator=BayesianEstimator,
prior_type="K2",
**states,
)
self.models_.append(model)
def nodes_edges(self):
nodes = 0
edges = 0
if hasattr(self, "fitted_"):
nodes = sum([len(x.dag_) for x in self.estimators_])
edges = sum([len(x.dag_.edges()) for x in self.estimators_])
return nodes, edges
def plot(self, title=""):
warnings.simplefilter("ignore", UserWarning)
for idx, model in enumerate(self.models_):
self.model_ = model
super().plot(title=f"{idx} {title}")
for idx, model in enumerate(self.estimators_):
model.plot(title=f"{idx} {title}")
class TANNew(TAN):
def __init__(
self,
show_progress=False,
random_state=None,
discretizer_depth=1e6,
discretizer_length=3,
discretizer_cuts=0,
):
self.discretizer_depth = discretizer_depth
self.discretizer_length = discretizer_length
self.discretizer_cuts = discretizer_cuts
super().__init__(
show_progress=show_progress, random_state=random_state
)
def fit(self, X, y, **kwargs):
self.estimator_ = Proposal(self)
self.estimator_.fit(X, y, **kwargs)
return self
def predict(self, X):
return self.estimator_.predict(X)
class KDBNew(KDB):
def __init__(
self,
k=2,
show_progress=False,
random_state=None,
discretizer_depth=1e6,
discretizer_length=3,
discretizer_cuts=0,
):
self.discretizer_depth = discretizer_depth
self.discretizer_length = discretizer_length
self.discretizer_cuts = discretizer_cuts
super().__init__(
k=k, show_progress=show_progress, random_state=random_state
)
def fit(self, X, y, **kwargs):
self.estimator_ = Proposal(self)
self.estimator_.fit(X, y, **kwargs)
return self
def predict(self, X):
return self.estimator_.predict(X)
class SPODENew(SPODE):
"""This class implements a classifier for the SPODE algorithm similar to
TANNew and KDBNew"""
def __init__(
self,
random_state,
show_progress,
discretizer_depth=1e6,
discretizer_length=3,
discretizer_cuts=0,
):
super().__init__(
random_state=random_state, show_progress=show_progress
)
self.discretizer_depth = discretizer_depth
self.discretizer_length = discretizer_length
self.discretizer_cuts = discretizer_cuts
class AODENew(AODE):
def __init__(
self,
random_state=None,
show_progress=False,
discretizer_depth=1e6,
discretizer_length=3,
discretizer_cuts=0,
):
self.discretizer_depth = discretizer_depth
self.discretizer_length = discretizer_length
self.discretizer_cuts = discretizer_cuts
super().__init__(
random_state=random_state,
show_progress=show_progress,
estimator=Proposal(
SPODENew(
random_state=random_state,
show_progress=show_progress,
discretizer_depth=discretizer_depth,
discretizer_length=discretizer_length,
discretizer_cuts=discretizer_cuts,
)
),
)
def _train(self, kwargs):
for dag in build_spodes(self.feature_names_in_, self.class_name_):
proposal = clone(self.estimator_)
proposal.estimator.dag_ = proposal.estimator.model_ = dag
self.estimators_.append(proposal.fit(self.X_, self.y_, **kwargs))
self.n_estimators_ = len(self.estimators_)
def predict(self, X: np.ndarray) -> np.ndarray:
check_is_fitted(self, ["X_", "y_", "fitted_"])
# Input validation
X = check_array(X)
n_samples = X.shape[0]
n_estimators = len(self.models_)
result = np.empty((n_samples, n_estimators))
dataset = pd.DataFrame(
X, columns=self.feature_names_in_, dtype=np.int32
)
for index, model in enumerate(self.models_):
result[:, index] = model.predict(dataset).values.ravel()
result = np.empty((X.shape[0], self.n_estimators_))
for index, model in enumerate(self.estimators_):
result[:, index] = model.predict(X)
return mode(result, axis=1, keepdims=False).mode.ravel()
@property
def states_(self):
if hasattr(self, "fitted_"):
return sum(
[
len(item)
for model in self.estimators_
for _, item in model.estimator.model_.states.items()
]
) / len(self.estimators_)
return 0
@property
def depth_(self):
return self.states_
def nodes_edges(self):
nodes = 0
edges = 0
if hasattr(self, "fitted_"):
nodes = sum([len(x.estimator.dag_) for x in self.estimators_])
edges = sum(
[len(x.estimator.dag_.edges()) for x in self.estimators_]
)
return nodes, edges
def plot(self, title=""):
warnings.simplefilter("ignore", UserWarning)
for idx, model in enumerate(self.estimators_):
model.estimator.plot(title=f"{idx} {title}")
def version(self):
if hasattr(self, "fitted_"):
return self.estimator_.estimator.version()
return SPODENew(None, False).version()
class Proposal(BaseEstimator):
def __init__(self, estimator):
self.estimator = estimator
self.class_type = estimator.__class__
def fit(self, X, y, **kwargs):
# Check parameters
self.estimator._check_params(X, y, kwargs)
# Discretize train data
self.discretizer_ = FImdlp(
n_jobs=1,
max_depth=self.estimator.discretizer_depth,
min_length=self.estimator.discretizer_length,
max_cuts=self.estimator.discretizer_cuts,
)
self.Xd = self.discretizer_.fit_transform(X, y)
kwargs = self.update_kwargs(y, kwargs)
# Build the model
super(self.class_type, self.estimator).fit(self.Xd, y, **kwargs)
# Local discretization based on the model
self._local_discretization()
# self.check_integrity("fit", self.Xd)
self.fitted_ = True
return self
def predict(self, X):
# Check is fit had been called
check_is_fitted(self, ["fitted_"])
# Input validation
X = check_array(X)
Xd = self.discretizer_.transform(X)
# self.check_integrity("predict", Xd)
return super(self.class_type, self.estimator).predict(Xd)
def update_kwargs(self, y, kwargs):
features = (
kwargs["features"]
if "features" in kwargs
else default_feature_names(self.Xd.shape[1])
)
states = {
features[i]: self.discretizer_.get_states_feature(i)
for i in range(self.Xd.shape[1])
}
class_name = (
kwargs["class_name"]
if "class_name" in kwargs
else self.estimator.default_class_name()
)
states[class_name] = np.unique(y).tolist()
kwargs["state_names"] = states
self.state_names_ = states
self.features_ = features
kwargs["features"] = features
kwargs["class_name"] = class_name
return kwargs
def _local_discretization(self):
"""Discretize each feature with its fathers and the class"""
upgrade = False
# order of local discretization is important. no good 0, 1, 2...
ancestral_order = list(nx.topological_sort(self.estimator.dag_))
for feature in ancestral_order:
if feature == self.estimator.class_name_:
continue
idx = self.estimator.indexed_features_[feature]
fathers = self.estimator.dag_.get_parents(feature)
if len(fathers) > 1:
# First remove the class name as it will be added later
fathers.remove(self.estimator.class_name_)
# Get the fathers indices
features = [
self.estimator.indexed_features_[f] for f in fathers
]
# Update the discretization of the feature
self.Xd[:, idx] = self.discretizer_.join_fit(
# each feature has to use previous discretization data=res
target=idx,
features=features,
data=self.Xd,
)
upgrade = True
if upgrade:
# Update the dataset
self.estimator.X_ = self.Xd
self.estimator.build_dataset()
self.state_names_ = {
key: self.discretizer_.get_states_feature(value)
for key, value in self.estimator.indexed_features_.items()
}
states = {"state_names": self.state_names_}
# Update the model
self.estimator.model_.fit(
self.estimator.dataset_,
estimator=BayesianEstimator,
prior_type="K2",
**states,
)
# def check_integrity(self, source, X):
# # print(f"Checking integrity of {source} data")
# for i in range(X.shape[1]):
# if not set(np.unique(X[:, i]).tolist()).issubset(
# set(self.state_names_[self.features_[i]])
# ):
# print(
# "i",
# i,
# "features[i]",
# self.features_[i],
# "np.unique(X[:, i])",
# np.unique(X[:, i]),
# "np.array(state_names[features[i]])",
# np.array(self.state_names_[self.features_[i]]),
# )
# raise ValueError("Discretization error")

19
bayesclass/test.py Normal file
View File

@@ -0,0 +1,19 @@
from bayesclass.clfs import AODENew, TANNew, KDBNew, AODE
from benchmark.datasets import Datasets
import os
os.chdir("../discretizbench")
dt = Datasets()
clfan = AODENew()
clftn = TANNew()
clfkn = KDBNew()
# clfa = AODE()
X, y = dt.load("iris")
# clfa.fit(X, y)
clfan.fit(X, y)
clftn.fit(X, y)
clfkn.fit(X, y)
self.discretizer_.target_
self.estimator.indexed_features_

Binary file not shown.

After

Width:  |  Height:  |  Size: 55 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 55 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 50 KiB

After

Width:  |  Height:  |  Size: 49 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 49 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

View File

@@ -0,0 +1,38 @@
import pytest
from sklearn.datasets import load_iris
from fimdlp.mdlp import FImdlp
@pytest.fixture
def iris():
dataset = load_iris()
X = dataset["data"]
y = dataset["target"]
features = dataset["feature_names"]
# To make iris dataset has the same values as our iris.arff dataset
patch = {(34, 3): (0.2, 0.1), (37, 1): (3.6, 3.1), (37, 2): (1.4, 1.5)}
for key, value in patch.items():
X[key] = value[1]
return X, y, features
@pytest.fixture
def data(iris):
return iris[0], iris[1]
@pytest.fixture
def features(iris):
return iris[2]
@pytest.fixture
def class_name():
return "class"
@pytest.fixture
def data_disc(data):
clf = FImdlp()
X, y = data
return clf.fit_transform(X, y), y

View File

@@ -1,6 +1,5 @@
import pytest
import numpy as np
from sklearn.datasets import load_iris
from sklearn.preprocessing import KBinsDiscretizer
from matplotlib.testing.decorators import image_comparison
from matplotlib.testing.conftest import mpl_test_settings
@@ -10,26 +9,19 @@ from bayesclass.clfs import AODE
from .._version import __version__
@pytest.fixture
def data():
X, y = load_iris(return_X_y=True)
enc = KBinsDiscretizer(encode="ordinal")
return enc.fit_transform(X), y
@pytest.fixture
def clf():
return AODE()
return AODE(random_state=17)
def test_AODE_default_hyperparameters(data, clf):
def test_AODE_default_hyperparameters(data_disc, clf):
# Test default values of hyperparameters
assert not clf.show_progress
assert clf.random_state is None
clf = AODE(show_progress=True, random_state=17)
assert clf.show_progress
assert clf.random_state == 17
clf.fit(*data)
clf = AODE(show_progress=True)
assert clf.show_progress
assert clf.random_state is None
clf.fit(*data_disc)
assert clf.class_name_ == "class"
assert clf.feature_names_in_ == [
"feature_0",
@@ -42,67 +34,66 @@ def test_AODE_default_hyperparameters(data, clf):
@image_comparison(
baseline_images=["line_dashes_AODE"], remove_text=True, extensions=["png"]
)
def test_AODE_plot(data, clf):
def test_AODE_plot(data_disc, features, clf):
# mpl_test_settings will automatically clean these internal side effects
mpl_test_settings
dataset = load_iris(as_frame=True)
clf.fit(*data, features=dataset["feature_names"])
clf.fit(*data_disc, features=features)
clf.plot("AODE Iris")
def test_AODE_version(clf):
def test_AODE_version(clf, features, data_disc):
"""Check AODE version."""
assert __version__ == clf.version()
clf.fit(*data_disc, features=features)
assert __version__ == clf.version()
def test_AODE_nodes_edges(clf, data):
def test_AODE_nodes_edges(clf, data_disc):
assert clf.nodes_edges() == (0, 0)
clf.fit(*data)
clf.fit(*data_disc)
assert clf.nodes_leaves() == (20, 28)
def test_AODE_states(clf, data):
def test_AODE_states(clf, data_disc):
assert clf.states_ == 0
clf = AODE(random_state=17)
clf.fit(*data)
assert clf.states_ == 23
clf.fit(*data_disc)
assert clf.states_ == 19
assert clf.depth_ == clf.states_
def test_AODE_classifier(data, clf):
clf.fit(*data)
def test_AODE_classifier(data_disc, clf):
clf.fit(*data_disc)
attribs = [
"classes_",
"X_",
"y_",
"feature_names_in_",
"class_name_",
"n_features_in_",
"X_",
"y_",
]
for attr in attribs:
assert hasattr(clf, attr)
X = data[0]
y = data[1]
X = data_disc[0]
y = data_disc[1]
y_pred = clf.predict(X)
assert y_pred.shape == (X.shape[0],)
assert sum(y == y_pred) == 147
assert sum(y == y_pred) == 146
def test_AODE_wrong_num_features(data, clf):
def test_AODE_wrong_num_features(data_disc, clf):
with pytest.raises(
ValueError,
match="Number of features does not match the number of columns in X",
):
clf.fit(*data, features=["feature_1", "feature_2"])
clf.fit(*data_disc, features=["feature_1", "feature_2"])
def test_AODE_wrong_hyperparam(data, clf):
def test_AODE_wrong_hyperparam(data_disc, clf):
with pytest.raises(ValueError, match="Unexpected argument: wrong_param"):
clf.fit(*data, wrong_param="wrong_param")
clf.fit(*data_disc, wrong_param="wrong_param")
def test_AODE_error_size_predict(data, clf):
X, y = data
def test_AODE_error_size_predict(data_disc, clf):
X, y = data_disc
clf.fit(X, y)
with pytest.raises(ValueError):
X_diff_size = np.ones((10, X.shape[1] + 1))

View File

@@ -0,0 +1,123 @@
import pytest
import numpy as np
from matplotlib.testing.decorators import image_comparison
from matplotlib.testing.conftest import mpl_test_settings
from bayesclass.clfs import AODENew
from .._version import __version__
@pytest.fixture
def clf():
return AODENew(random_state=17)
def test_AODENew_default_hyperparameters(data, clf):
# Test default values of hyperparameters
assert not clf.show_progress
assert clf.random_state == 17
clf = AODENew(show_progress=True)
assert clf.show_progress
assert clf.random_state is None
clf.fit(*data)
assert clf.class_name_ == "class"
assert clf.feature_names_in_ == [
"feature_0",
"feature_1",
"feature_2",
"feature_3",
]
@image_comparison(
baseline_images=["line_dashes_AODENew"],
remove_text=True,
extensions=["png"],
)
def test_AODENew_plot(data, features, clf):
# mpl_test_settings will automatically clean these internal side effects
mpl_test_settings
clf.fit(*data, features=features)
clf.plot("AODE Iris")
def test_AODENew_version(clf, data):
"""Check AODENew version."""
assert __version__ == clf.version()
clf.fit(*data)
assert __version__ == clf.version()
def test_AODENew_nodes_edges(clf, data):
assert clf.nodes_edges() == (0, 0)
clf.fit(*data)
assert clf.nodes_leaves() == (20, 28)
def test_AODENew_states(clf, data):
assert clf.states_ == 0
clf.fit(*data)
assert clf.states_ == 17.75
assert clf.depth_ == clf.states_
def test_AODENew_classifier(data, clf):
clf.fit(*data)
attribs = [
"feature_names_in_",
"class_name_",
"n_features_in_",
"X_",
"y_",
]
for attr in attribs:
assert hasattr(clf, attr)
X = data[0]
y = data[1]
y_pred = clf.predict(X)
assert y_pred.shape == (X.shape[0],)
assert sum(y == y_pred) == 146
def test_AODENew_local_discretization(clf, data_disc):
expected_data = [
[-1, [0, -1], [0, -1], [0, -1]],
[[1, -1], -1, [1, -1], [1, -1]],
[[2, -1], [2, -1], -1, [2, -1]],
[[3, -1], [3, -1], [3, -1], -1],
]
clf.fit(*data_disc)
for idx, estimator in enumerate(clf.estimators_):
expected = expected_data[idx]
for feature in range(4):
computed = estimator.discretizer_.target_[feature]
if type(computed) == list:
for j, k in zip(expected[feature], computed):
assert j == k
else:
assert (
expected[feature]
== estimator.discretizer_.target_[feature]
)
def test_AODENew_wrong_num_features(data, clf):
with pytest.raises(
ValueError,
match="Number of features does not match the number of columns in X",
):
clf.fit(*data, features=["feature_1", "feature_2"])
def test_AODENew_wrong_hyperparam(data, clf):
with pytest.raises(ValueError, match="Unexpected argument: wrong_param"):
clf.fit(*data, wrong_param="wrong_param")
def test_AODENew_error_size_predict(data, clf):
X, y = data
clf.fit(X, y)
with pytest.raises(ValueError):
X_diff_size = np.ones((10, X.shape[1] + 1))
clf.predict(X_diff_size)

View File

@@ -1,6 +1,5 @@
import pytest
import numpy as np
from sklearn.datasets import load_iris
from sklearn.preprocessing import KBinsDiscretizer
from matplotlib.testing.decorators import image_comparison
from matplotlib.testing.conftest import mpl_test_settings
@@ -11,19 +10,12 @@ from bayesclass.clfs import KDB
from .._version import __version__
@pytest.fixture
def data():
X, y = load_iris(return_X_y=True)
enc = KBinsDiscretizer(encode="ordinal")
return enc.fit_transform(X), y
@pytest.fixture
def clf():
return KDB(k=3)
def test_KDB_default_hyperparameters(data, clf):
def test_KDB_default_hyperparameters(data_disc, clf):
# Test default values of hyperparameters
assert not clf.show_progress
assert clf.random_state is None
@@ -32,7 +24,7 @@ def test_KDB_default_hyperparameters(data, clf):
assert clf.show_progress
assert clf.random_state == 17
assert clf.k == 3
clf.fit(*data)
clf.fit(*data_disc)
assert clf.class_name_ == "class"
assert clf.feature_names_in_ == [
"feature_0",
@@ -47,58 +39,56 @@ def test_KDB_version(clf):
assert __version__ == clf.version()
def test_KDB_nodes_edges(clf, data):
def test_KDB_nodes_edges(clf, data_disc):
assert clf.nodes_edges() == (0, 0)
clf.fit(*data)
assert clf.nodes_leaves() == (5, 10)
clf.fit(*data_disc)
assert clf.nodes_leaves() == (5, 9)
def test_KDB_states(clf, data):
def test_KDB_states(clf, data_disc):
assert clf.states_ == 0
clf = KDB(k=3, random_state=17)
clf.fit(*data)
assert clf.states_ == 23
clf.fit(*data_disc)
assert clf.states_ == 19
assert clf.depth_ == clf.states_
def test_KDB_classifier(data, clf):
clf.fit(*data)
def test_KDB_classifier(data_disc, clf):
clf.fit(*data_disc)
attribs = ["classes_", "X_", "y_", "feature_names_in_", "class_name_"]
for attr in attribs:
assert hasattr(clf, attr)
X = data[0]
y = data[1]
X = data_disc[0]
y = data_disc[1]
y_pred = clf.predict(X)
assert y_pred.shape == (X.shape[0],)
assert sum(y == y_pred) == 148
assert sum(y == y_pred) == 146
@image_comparison(
baseline_images=["line_dashes_KDB"], remove_text=True, extensions=["png"]
)
def test_KDB_plot(data, clf):
def test_KDB_plot(data_disc, features, clf):
# mpl_test_settings will automatically clean these internal side effects
mpl_test_settings
dataset = load_iris(as_frame=True)
clf.fit(*data, features=dataset["feature_names"])
clf.fit(*data_disc, features=features)
clf.plot("KDB Iris")
def test_KDB_wrong_num_features(data, clf):
def test_KDB_wrong_num_features(data_disc, clf):
with pytest.raises(
ValueError,
match="Number of features does not match the number of columns in X",
):
clf.fit(*data, features=["feature_1", "feature_2"])
clf.fit(*data_disc, features=["feature_1", "feature_2"])
def test_KDB_wrong_hyperparam(data, clf):
def test_KDB_wrong_hyperparam(data_disc, clf):
with pytest.raises(ValueError, match="Unexpected argument: wrong_param"):
clf.fit(*data, wrong_param="wrong_param")
clf.fit(*data_disc, wrong_param="wrong_param")
def test_KDB_error_size_predict(data, clf):
X, y = data
def test_KDB_error_size_predict(data_disc, clf):
X, y = data_disc
clf.fit(X, y)
with pytest.raises(ValueError):
X_diff_size = np.ones((10, X.shape[1] + 1))

View File

@@ -0,0 +1,133 @@
import pytest
import numpy as np
from matplotlib.testing.decorators import image_comparison
from matplotlib.testing.conftest import mpl_test_settings
from pgmpy.models import BayesianNetwork
from bayesclass.clfs import KDBNew
from .._version import __version__
@pytest.fixture
def clf():
return KDBNew(k=3)
def test_KDBNew_default_hyperparameters(data, clf):
# Test default values of hyperparameters
assert not clf.show_progress
assert clf.random_state is None
assert clf.theta == 0.03
clf = KDBNew(show_progress=True, random_state=17, k=3)
assert clf.show_progress
assert clf.random_state == 17
assert clf.k == 3
clf.fit(*data)
assert clf.class_name_ == "class"
assert clf.feature_names_in_ == [
"feature_0",
"feature_1",
"feature_2",
"feature_3",
]
def test_KDBNew_version(clf):
"""Check KDBNew version."""
assert __version__ == clf.version()
def test_KDBNew_nodes_edges(clf, data):
assert clf.nodes_edges() == (0, 0)
clf.fit(*data)
assert clf.nodes_leaves() == (5, 9)
def test_KDBNew_states(clf, data):
assert clf.states_ == 0
clf.fit(*data)
assert clf.states_ == 22
assert clf.depth_ == clf.states_
def test_KDBNew_classifier(data, clf):
clf.fit(*data)
attribs = ["classes_", "X_", "y_", "feature_names_in_", "class_name_"]
for attr in attribs:
assert hasattr(clf, attr)
X = data[0]
y = data[1]
y_pred = clf.predict(X)
assert y_pred.shape == (X.shape[0],)
assert sum(y == y_pred) == 145
def test_KDBNew_local_discretization(clf, data):
expected = [[1, -1], -1, [0, 1, 3, -1], [1, -1]]
clf.fit(*data)
for feature in range(4):
computed = clf.estimator_.discretizer_.target_[feature]
print("computed:", computed)
if type(computed) == list:
for j, k in zip(expected[feature], computed):
assert j == k
else:
assert (
expected[feature]
== clf.estimator_.discretizer_.target_[feature]
)
@image_comparison(
baseline_images=["line_dashes_KDBNew"],
remove_text=True,
extensions=["png"],
)
def test_KDBNew_plot(data, features, class_name, clf):
# mpl_test_settings will automatically clean these internal side effects
mpl_test_settings
clf.fit(*data, features=features, class_name=class_name)
clf.plot("KDBNew Iris")
def test_KDBNew_wrong_num_features(data, clf):
with pytest.raises(
ValueError,
match="Number of features does not match the number of columns in X",
):
clf.fit(*data, features=["feature_1", "feature_2"])
def test_KDBNew_wrong_hyperparam(data, clf):
with pytest.raises(ValueError, match="Unexpected argument: wrong_param"):
clf.fit(*data, wrong_param="wrong_param")
def test_KDBNew_error_size_predict(data, clf):
X, y = data
clf.fit(X, y)
with pytest.raises(ValueError):
X_diff_size = np.ones((10, X.shape[1] + 1))
clf.predict(X_diff_size)
def test_KDBNew_dont_do_cycles():
clf = KDBNew(k=4)
dag = BayesianNetwork()
clf.feature_names_in_ = [
"feature_0",
"feature_1",
"feature_2",
"feature_3",
]
nodes = list(range(4))
weights = np.ones((4, 4))
for idx in range(1, 4):
dag.add_edge(clf.feature_names_in_[0], clf.feature_names_in_[idx])
dag.add_edge(clf.feature_names_in_[1], clf.feature_names_in_[2])
dag.add_edge(clf.feature_names_in_[1], clf.feature_names_in_[3])
dag.add_edge(clf.feature_names_in_[2], clf.feature_names_in_[3])
for idx in range(4):
clf._add_m_edges(dag, idx, nodes, weights)
assert len(dag.edges()) == 6

View File

@@ -1,7 +1,5 @@
import pytest
import numpy as np
from sklearn.datasets import load_iris
from sklearn.preprocessing import KBinsDiscretizer
from matplotlib.testing.decorators import image_comparison
from matplotlib.testing.conftest import mpl_test_settings
@@ -10,26 +8,19 @@ from bayesclass.clfs import TAN
from .._version import __version__
@pytest.fixture
def data():
X, y = load_iris(return_X_y=True)
enc = KBinsDiscretizer(encode="ordinal")
return enc.fit_transform(X), y
@pytest.fixture
def clf():
return TAN()
return TAN(random_state=17)
def test_TAN_default_hyperparameters(data, clf):
def test_TAN_default_hyperparameters(data_disc, clf):
# Test default values of hyperparameters
assert not clf.show_progress
assert clf.random_state is None
clf = TAN(show_progress=True, random_state=17)
assert clf.show_progress
assert clf.random_state == 17
clf.fit(*data)
clf = TAN(show_progress=True)
assert clf.show_progress
assert clf.random_state is None
clf.fit(*data_disc)
assert clf.head_ == 0
assert clf.class_name_ == "class"
assert clf.feature_names_in_ == [
@@ -45,29 +36,26 @@ def test_TAN_version(clf):
assert __version__ == clf.version()
def test_TAN_nodes_edges(clf, data):
def test_TAN_nodes_edges(clf, data_disc):
assert clf.nodes_edges() == (0, 0)
clf = TAN(random_state=17)
clf.fit(*data, head="random")
clf.fit(*data_disc, head="random")
assert clf.nodes_leaves() == (5, 7)
def test_TAN_states(clf, data):
def test_TAN_states(clf, data_disc):
assert clf.states_ == 0
clf = TAN(random_state=17)
clf.fit(*data)
assert clf.states_ == 23
clf.fit(*data_disc)
assert clf.states_ == 19
assert clf.depth_ == clf.states_
def test_TAN_random_head(data):
clf = TAN(random_state=17)
clf.fit(*data, head="random")
def test_TAN_random_head(clf, data_disc):
clf.fit(*data_disc, head="random")
assert clf.head_ == 3
def test_TAN_classifier(data, clf):
clf.fit(*data)
def test_TAN_classifier(data_disc, clf):
clf.fit(*data_disc)
attribs = [
"classes_",
"X_",
@@ -78,44 +66,43 @@ def test_TAN_classifier(data, clf):
]
for attr in attribs:
assert hasattr(clf, attr)
X = data[0]
y = data[1]
X = data_disc[0]
y = data_disc[1]
y_pred = clf.predict(X)
assert y_pred.shape == (X.shape[0],)
assert sum(y == y_pred) == 147
assert sum(y == y_pred) == 146
@image_comparison(
baseline_images=["line_dashes_TAN"], remove_text=True, extensions=["png"]
)
def test_TAN_plot(data, clf):
def test_TAN_plot(data_disc, features, clf):
# mpl_test_settings will automatically clean these internal side effects
mpl_test_settings
dataset = load_iris(as_frame=True)
clf.fit(*data, features=dataset["feature_names"], head=0)
clf.fit(*data_disc, features=features, head=0)
clf.plot("TAN Iris head=0")
def test_TAN_wrong_num_features(data, clf):
def test_TAN_wrong_num_features(data_disc, clf):
with pytest.raises(
ValueError,
match="Number of features does not match the number of columns in X",
):
clf.fit(*data, features=["feature_1", "feature_2"])
clf.fit(*data_disc, features=["feature_1", "feature_2"])
def test_TAN_wrong_hyperparam(data, clf):
def test_TAN_wrong_hyperparam(data_disc, clf):
with pytest.raises(ValueError, match="Unexpected argument: wrong_param"):
clf.fit(*data, wrong_param="wrong_param")
clf.fit(*data_disc, wrong_param="wrong_param")
def test_TAN_head_out_of_range(data, clf):
def test_TAN_head_out_of_range(data_disc, clf):
with pytest.raises(ValueError, match="Head index out of range"):
clf.fit(*data, head=4)
clf.fit(*data_disc, head=4)
def test_TAN_error_size_predict(data, clf):
X, y = data
def test_TAN_error_size_predict(data_disc, clf):
X, y = data_disc
clf.fit(X, y)
with pytest.raises(ValueError):
X_diff_size = np.ones((10, X.shape[1] + 1))

View File

@@ -0,0 +1,120 @@
import pytest
import numpy as np
from matplotlib.testing.decorators import image_comparison
from matplotlib.testing.conftest import mpl_test_settings
from bayesclass.clfs import TANNew
from .._version import __version__
@pytest.fixture
def clf():
return TANNew(random_state=17)
def test_TANNew_default_hyperparameters(data, clf):
# Test default values of hyperparameters
assert not clf.show_progress
assert clf.random_state == 17
clf = TANNew(show_progress=True)
assert clf.show_progress
assert clf.random_state is None
clf.fit(*data)
assert clf.head_ == 0
assert clf.class_name_ == "class"
assert clf.feature_names_in_ == [
"feature_0",
"feature_1",
"feature_2",
"feature_3",
]
def test_TANNew_version(clf):
"""Check TANNew version."""
assert __version__ == clf.version()
def test_TANNew_nodes_edges(clf, data):
assert clf.nodes_edges() == (0, 0)
clf.fit(*data, head="random")
assert clf.nodes_leaves() == (5, 7)
def test_TANNew_states(clf, data):
assert clf.states_ == 0
clf.fit(*data)
assert clf.states_ == 18
assert clf.depth_ == clf.states_
def test_TANNew_random_head(clf, data):
clf.fit(*data, head="random")
assert clf.head_ == 3
def test_TANNew_local_discretization(clf, data):
expected = [-1, [0, -1], [0, -1], [1, -1]]
clf.fit(*data)
for feature in range(4):
assert (
expected[feature] == clf.estimator_.discretizer_.target_[feature]
)
def test_TANNew_classifier(data, clf):
clf.fit(*data)
attribs = [
"classes_",
"X_",
"y_",
"head_",
"feature_names_in_",
"class_name_",
]
for attr in attribs:
assert hasattr(clf, attr)
X = data[0]
y = data[1]
y_pred = clf.predict(X)
assert y_pred.shape == (X.shape[0],)
assert sum(y == y_pred) == 146
@image_comparison(
baseline_images=["line_dashes_TANNew"],
remove_text=True,
extensions=["png"],
)
def test_TANNew_plot(data, features, clf):
# mpl_test_settings will automatically clean these internal side effects
mpl_test_settings
clf.fit(*data, features=features, head=0)
clf.plot("TANNew Iris head=0")
def test_TANNew_wrong_num_features(data, clf):
with pytest.raises(
ValueError,
match="Number of features does not match the number of columns in X",
):
clf.fit(*data, features=["feature_1", "feature_2"])
def test_TANNew_wrong_hyperparam(data, clf):
with pytest.raises(ValueError, match="Unexpected argument: wrong_param"):
clf.fit(*data, wrong_param="wrong_param")
def test_TANNew_head_out_of_range(data, clf):
with pytest.raises(ValueError, match="Head index out of range"):
clf.fit(*data, head=4)
def test_TANNew_error_size_predict(data, clf):
X, y = data
clf.fit(X, y)
with pytest.raises(ValueError):
X_diff_size = np.ones((10, X.shape[1] + 1))
clf.predict(X_diff_size)

View File

@@ -1,8 +1,23 @@
import pytest
import numpy as np
from sklearn.utils.estimator_checks import check_estimator
from bayesclass.clfs import TAN, KDB, AODE
from bayesclass.clfs import BayesBase, TAN, KDB, AODE
def test_more_tags():
expected = {
"requires_positive_X": True,
"requires_positive_y": True,
"preserve_dtype": [np.int32, np.int64],
"requires_y": True,
}
clf = BayesBase(None, True)
computed = clf._more_tags()
for key, value in expected.items():
assert key in computed
assert computed[key] == value
# @pytest.mark.parametrize("estimators", [TAN(), KDB(k=2), AODE()])

View File

@@ -25,6 +25,7 @@ dependencies = [
"pgmpy",
"networkx",
"matplotlib",
"fimdlp",
]
requires-python = ">=3.8"
classifiers = [