Add new experimentation and parameter

This commit is contained in:
2025-05-25 10:43:41 +02:00
parent cf8fd3454e
commit 0e724f2c6b
4 changed files with 358 additions and 30 deletions

View File

@@ -160,6 +160,15 @@ class Arguments(argparse.ArgumentParser):
"help": "Ignore nan results",
},
],
"iwss": [
("--iwss",),
{
"default": False,
"action": "store_true",
"required": False,
"help": "Do IWSS with training set and then apply to test set",
},
],
"key": [
("-k", "--key"),
{

View File

@@ -7,12 +7,17 @@ import time
from datetime import datetime
from tqdm import tqdm
import numpy as np
from mufs import MUFS
from sklearn.model_selection import (
StratifiedKFold,
KFold,
GridSearchCV,
cross_validate,
)
from sklearn.svm import LinearSVC
from sklearn.feature_selection import SelectFromModel
from sklearn.preprocessing import label_binarize
from sklearn.base import clone
from sklearn.metrics import check_scoring, roc_auc_score
from .Utils import Folders, Files, NO_RESULTS
from .Datasets import Datasets
from .Models import Models
@@ -115,6 +120,7 @@ class Experiment:
ignore_nan=True,
fit_features=None,
discretize=None,
iwss=False,
folds=5,
):
env_data = EnvData().load()
@@ -176,6 +182,7 @@ class Experiment:
self.random_seeds = Randomized.seeds()
self.results = []
self.duration = 0
self.iwss = iwss
self._init_experiment()
def get_output_file(self):
@@ -212,52 +219,362 @@ class Experiment:
res["state_names"] = states
return res
# def _n_fold_crossval(self, name, X, y, hyperparameters):
# if self.scores != []:
# raise ValueError("Must init experiment before!")
# loop = tqdm(
# self.random_seeds,
# position=1,
# leave=False,
# disable=not self.progress_bar,
# )
# for random_state in loop:
# loop.set_description(f"Seed({random_state:4d})")
# random.seed(random_state)
# np.random.seed(random_state)
# kfold = self.stratified_class(
# shuffle=True, random_state=random_state, n_splits=self.folds
# )
# clf = self._build_classifier(random_state, hyperparameters)
# fit_params = self._build_fit_params(name)
# self.version = Models.get_version(self.model_name, clf)
# with warnings.catch_warnings():
# warnings.filterwarnings("ignore")
# if self.iwss:
# # Manual cross-validation with IWSS feature selection
# fold_scores = []
# fold_times = []
# fold_estimators = []
# for train_idx, test_idx in kfold.split(X, y):
# # Split data
# X_train, X_test = X[train_idx], X[test_idx]
# y_train, y_test = y[train_idx], y[test_idx]
# # Apply IWSS feature selection
# transformer = MUFS()
# transformer.iwss(X_train, y_train, 0.5)
# X_train_selected = X_train[
# :, transformer.get_results()
# ]
# X_test_selected = X_test[:, transformer.get_results()]
# # print("Selected features:", transformer.get_results())
# # print(
# # f"Number of selected features: {X_train_selected.shape[1]}"
# # )
# # Clone classifier to avoid data leakage between folds
# clf_fold = clone(clf)
# # Fit the classifier
# start_time = time.time()
# clf_fold.fit(X_train_selected, y_train)
# fit_time = time.time() - start_time
# # Score on test set
# score_func = get_scorer(
# self.score_name.replace("-", "_")
# )
# # Handle scoring based on the metric type
# if self.score_name in [
# "roc_auc",
# "log_loss",
# "roc_auc_ovr",
# "roc_auc_ovo",
# ]:
# # These metrics need probabilities
# if hasattr(clf_fold, "predict_proba"):
# y_score = clf_fold.predict_proba(
# X_test_selected
# )
# # Handle missing classes in the fold
# if len(unique_train_classes) < len(
# unique_all_classes
# ):
# # Create a full probability matrix with zeros for missing classes
# y_score_full = np.zeros(
# (len(y_test), len(unique_all_classes))
# )
# for i, class_label in enumerate(
# unique_train_classes
# ):
# class_idx = np.where(
# unique_all_classes == class_label
# )[0][0]
# y_score_full[:, class_idx] = y_score[
# :, i
# ]
# y_score = y_score_full
# else:
# # Fallback to decision_function for SVM-like models
# y_score = clf_fold.decision_function(
# X_test_selected
# )
# test_score = score_func._score_func(
# y_test, y_score
# )
# else:
# # For metrics that use predictions (accuracy, f1, etc.)
# test_score = score_func(
# clf_fold, X_test_selected, y_test
# )
# fold_scores.append(test_score)
# fold_times.append(fit_time)
# fold_estimators.append(clf_fold)
# # Package results to match cross_validate output format
# res = {
# "test_score": np.array(fold_scores),
# "fit_time": np.array(fold_times),
# "estimator": fold_estimators,
# }
# else:
# # Original cross_validate approach
# res = cross_validate(
# clf,
# X,
# y,
# cv=kfold,
# fit_params=fit_params,
# return_estimator=True,
# scoring=self.score_name.replace("-", "_"),
# )
# # Handle NaN values
# if np.isnan(res["test_score"]).any():
# if not self.ignore_nan:
# print(res["test_score"])
# raise ValueError("NaN in results")
# results = res["test_score"][~np.isnan(res["test_score"])]
# else:
# results = res["test_score"]
# # Store results
# self.scores.extend(results)
# self.times.extend(res["fit_time"])
# for result_item in res["estimator"]:
# nodes_item, leaves_item, depth_item = (
# Models.get_complexity(self.model_name, result_item)
# )
# self.nodes.append(nodes_item)
# self.leaves.append(leaves_item)
# self.depths.append(depth_item)
# from sklearn.base import clone
# import numpy as np
# import time
# import warnings
# from tqdm import tqdm
def _n_fold_crossval(self, name, X, y, hyperparameters):
if self.scores != []:
raise ValueError("Must init experiment before!")
# Get all unique classes and check data
unique_all_classes = np.sort(np.unique(y))
n_classes = len(unique_all_classes)
# Check if we have enough samples per class for stratified k-fold
min_samples_per_class = np.min(np.bincount(y))
if min_samples_per_class < self.folds:
warnings.warn(
f"Class imbalance detected: minimum class has {min_samples_per_class} samples. "
f"Consider using fewer folds or handling imbalanced data."
)
loop = tqdm(
self.random_seeds,
position=1,
leave=False,
disable=not self.progress_bar,
)
for random_state in loop:
loop.set_description(f"Seed({random_state:4d})")
random.seed(random_state)
np.random.seed(random_state)
kfold = self.stratified_class(
shuffle=True, random_state=random_state, n_splits=self.folds
)
clf = self._build_classifier(random_state, hyperparameters)
fit_params = self._build_fit_params(name)
self.version = Models.get_version(self.model_name, clf)
# Check if the classifier supports probability predictions
scorer = check_scoring(clf, scoring="roc_auc_ovr")
if not hasattr(clf, "predict_proba") and not hasattr(
clf, "decision_function"
):
raise ValueError(
f"Classifier {self.model_name} doesn't support probability predictions "
"required for ROC-AUC scoring"
)
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
res = cross_validate(
clf,
X,
y,
cv=kfold,
fit_params=fit_params,
return_estimator=True,
scoring=self.score_name.replace("-", "_"),
)
if np.isnan(res["test_score"]).any():
if not self.ignore_nan:
print(res["test_score"])
raise ValueError("NaN in results")
results = res["test_score"][~np.isnan(res["test_score"])]
else:
results = res["test_score"]
self.scores.extend(results)
self.times.extend(res["fit_time"])
for result_item in res["estimator"]:
nodes_item, leaves_item, depth_item = Models.get_complexity(
self.model_name, result_item
)
self.nodes.append(nodes_item)
self.leaves.append(leaves_item)
self.depths.append(depth_item)
fold_scores = []
fold_times = []
fold_estimators = []
for fold_idx, (train_idx, test_idx) in enumerate(
kfold.split(X, y)
):
# Split data
X_train, X_test = X[train_idx], X[test_idx]
y_train, y_test = y[train_idx], y[test_idx]
# Check classes in this fold
unique_test_classes = np.unique(y_test)
n_test_classes = len(unique_test_classes)
# Skip fold if we don't have at least 2 classes in test set
if n_test_classes < 2:
warnings.warn(
f"Fold {fold_idx}: Test set has only {n_test_classes} class(es). "
f"Skipping this fold for ROC-AUC calculation."
)
fold_scores.append(np.nan)
fold_times.append(np.nan)
fold_estimators.append(None)
continue
# Apply IWSS feature selection if enabled
if self.iwss:
# transformer = (
# MUFS(discrete=False)
# if "cli_rad" in name
# else MUFS(discrete=True)
# )
# transformer.iwss(X_train, y_train, 0.5)
# selected_features = transformer.get_results()
# Apply L1-based feature selection
# Using LinearSVC with L1 penalty
lsvc = LinearSVC(
C=0.1, # Regularization parameter - adjust this for more/fewer features
penalty="l1",
dual=False,
max_iter=2000,
random_state=random_state,
)
selector = SelectFromModel(lsvc, prefit=False)
selector.fit(X_train, y_train)
# Transform the data
X_train_selected = selector.transform(X_train)
X_test_selected = selector.transform(X_test)
# Get information about selected features
selected_features = selector.get_support(indices=True)
n_selected = len(selected_features)
if len(selected_features) == 0:
warnings.warn(
f"Fold {fold_idx}: No features selected by IWSS. Using all features."
)
X_train_selected = X_train
X_test_selected = X_test
else:
X_train_selected = X_train[:, selected_features]
X_test_selected = X_test[:, selected_features]
else:
X_train_selected = X_train
X_test_selected = X_test
# Clone and fit classifier
clf_fold = clone(clf)
start_time = time.time()
clf_fold.fit(X_train_selected, y_train)
fit_time = time.time() - start_time
# Get probability predictions
y_proba = clf_fold.predict_proba(X_test_selected)
# Calculate ROC-AUC score
# Handle case where test set doesn't have all classes
if len(clf_fold.classes_) != len(unique_test_classes):
# Map probabilities to only test classes
test_class_indices = [
np.where(clf_fold.classes_ == c)[0][0]
for c in unique_test_classes
if c in clf_fold.classes_
]
y_proba = y_proba[:, test_class_indices]
# Binarize labels for multi-class ROC-AUC
y_test_binarized = label_binarize(
y_test, classes=unique_test_classes
)
# Calculate ROC-AUC with OVR strategy
if n_test_classes == 2:
# Binary classification
test_score = roc_auc_score(y_test, y_proba[:, 1])
else:
# Multi-class with macro-average
test_score = roc_auc_score(
y_test_binarized,
y_proba,
multi_class="ovr",
average="macro",
)
fold_scores.append(test_score)
fold_times.append(fit_time)
fold_estimators.append(clf_fold)
# Filter out NaN scores if ignore_nan is True
scores_array = np.array(fold_scores)
times_array = np.array(fold_times)
if np.isnan(scores_array).any():
if not self.ignore_nan:
nan_folds = np.where(np.isnan(scores_array))[0]
raise ValueError(
f"NaN scores in folds {nan_folds}. "
f"Set ignore_nan=True to skip these folds."
)
else:
# Filter out NaN values
valid_mask = ~np.isnan(scores_array)
scores_array = scores_array[valid_mask]
times_array = times_array[valid_mask]
fold_estimators = [
e
for e, valid in zip(fold_estimators, valid_mask)
if valid
]
if len(scores_array) == 0:
warnings.warn(
f"All folds resulted in NaN for seed {random_state}. Skipping."
)
continue
# Store results
self.scores.extend(scores_array)
self.times.extend(times_array)
# Store complexity metrics
for estimator in fold_estimators:
if estimator is not None:
nodes_item, leaves_item, depth_item = (
Models.get_complexity(self.model_name, estimator)
)
self.nodes.append(nodes_item)
self.leaves.append(leaves_item)
self.depths.append(depth_item)
def _add_results(self, name, hyperparameters, samples, features, classes):
record = {}

View File

@@ -71,6 +71,7 @@ class Models:
algorithm="SAMME",
random_state=random_state,
),
"AdaBoost": AdaBoostClassifier(random_state=random_state),
"GBC": GradientBoostingClassifier(random_state=random_state),
"RandomForest": RandomForestClassifier(random_state=random_state),
"Mock": MockModel(random_state=random_state),
@@ -99,13 +100,13 @@ class Models:
nodes = 0
leaves = result.get_n_leaves()
depth = 0
elif name.startswith("Bagging") or name.startswith("AdaBoost"):
elif name.startswith("Bagging") or name == "AdaBoostStree":
nodes, leaves = list(
zip(*[x.nodes_leaves() for x in result.estimators_])
)
nodes, leaves = mean(nodes), mean(leaves)
depth = mean([x.depth_ for x in result.estimators_])
elif name == "RandomForest":
elif name == "RandomForest" or name == "AdaBoost":
leaves = mean([x.get_n_leaves() for x in result.estimators_])
depth = mean([x.get_depth() for x in result.estimators_])
nodes = mean([x.tree_.node_count for x in result.estimators_])

View File

@@ -14,7 +14,7 @@ def main(args_test=None):
arguments.xset("stratified").xset("score").xset("model", mandatory=True)
arguments.xset("n_folds").xset("platform").xset("quiet").xset("title")
arguments.xset("report").xset("ignore_nan").xset("discretize")
arguments.xset("fit_features")
arguments.xset("fit_features").xset("iwss")
arguments.add_exclusive(
["grid_paramfile", "best_paramfile", "hyperparameters"]
)
@@ -43,6 +43,7 @@ def main(args_test=None):
folds=args.n_folds,
fit_features=args.fit_features,
discretize=args.discretize,
iwss=args.iwss,
)
job.do_experiment()
except ValueError as e: