Files
stree_datasets/experimentation/Experiments.py

216 lines
7.8 KiB
Python

import json
import os
import time
import warnings
import statistics
from sklearn.model_selection import GridSearchCV, cross_validate
from . import Models
from .Database import Hyperparameters, MySQL, Outcomes
from .Sets import Datasets
from sklearn.model_selection._split import KFold
class Experiment:
def __init__(
self,
random_state: int,
model: str,
host: str,
set_of_files: str,
kernel: str,
threads: int = -1,
) -> None:
self._random_state = random_state
self._set_model(model)
self._set_of_files = set_of_files
self._clf = self._type(random_state=self._random_state)
self._host = host
# used in gridsearch with ensembles to take best hyperparams of
# base class or gridsearch these hyperparams as well
self._base_params = "any"
self._kernel = kernel
self._threads = threads
def set_base_params(self, base_params: str) -> None:
self._base_params = base_params
def _set_model(self, model_name: str) -> None:
self._model_name = model_name
self._type = getattr(
Models,
f"Model{model_name[0].upper() + model_name[1:]}",
)
def cross_validation(self, dataset: str) -> None:
self._clf = self._type(random_state=self._random_state)
model = self._clf.get_model()
hyperparams = MySQL()
hyperparams.get_connection()
record = hyperparams.find_best(
dataset, self._model_name, experiment="gridsearch"
)
hyperparams.close()
if record is None:
try:
hyperparams = Hyperparameters(
host=self._host, model=self._model_name
)
parameters, normalize, standardize = hyperparams.get_params(
dataset
)
except ValueError:
print(f"*** {dataset} not trained")
return
else:
normalize = record[6]
standardize = record[7]
parameters = record[8]
datasets = Datasets(
normalize=normalize,
standardize=standardize,
set_of_files=self._set_of_files,
)
parameters = json.loads(parameters)
X, y = datasets.load(dataset)
# init cross validation object just in case consecutive experiments
self._clf = self._type(random_state=self._random_state)
model.set_params(**parameters)
self._num_warnings = 0
warnings.warn = self._warn
# Execute with 10 different seeds to ease the random effect
total = {}
outcomes = ["fit_time", "score_time", "train_score", "test_score"]
for item in outcomes:
total[item] = []
nodes_total = []
leaves_total = []
depths_total = []
for random_state in [57, 31, 1714, 17, 23, 79, 83, 97, 7, 1]:
kfold = KFold(shuffle=True, random_state=random_state, n_splits=5)
model.set_params(**{"random_state": random_state})
print(f"{random_state}, ", end="", flush=True)
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
# Also affect subprocesses
os.environ["PYTHONWARNINGS"] = "ignore"
results = cross_validate(
model,
X,
y,
return_train_score=True,
return_estimator=True,
n_jobs=self._threads,
cv=kfold,
)
for item in outcomes:
total[item].append(results[item])
if type(model).__name__ == "Stree":
for result_item in results["estimator"]:
nodes, leaves = result_item.nodes_leaves()
nodes_total.append(nodes)
leaves_total.append(leaves)
depths_total.append(result_item.depth_)
if type(model).__name__ == "Stree":
nodes = statistics.mean(nodes_total)
leaves = statistics.mean(leaves_total)
depth = statistics.mean(depths_total)
else:
nodes = leaves = depth = 0.0
print("end")
complexity = dict(nodes=nodes, leaves=leaves, depth=depth)
outcomes = Outcomes(host=self._host, model=self._model_name)
parameters = json.dumps(parameters, sort_keys=True)
outcomes.store(
dataset, normalize, standardize, parameters, total, complexity
)
if self._num_warnings > 0:
print(f"{self._num_warnings} warnings have happend")
def grid_search(
self, dataset: str, normalize: bool, standardize: bool
) -> None:
"""First of all if the modle is an ensemble search for the best
hyperparams found in gridsearch for base model and overrides
normalize and standardize
"""
hyperparams = Hyperparameters(host=self._host, model=self._model_name)
model = self._clf.get_model()
if self._kernel != "any":
# set parameters grid to only one kernel
if isinstance(self._clf, Models.Ensemble):
self._clf._base_model.select_params(self._kernel)
else:
self._clf.select_params(self._kernel)
hyperparameters = self._clf.get_parameters()
grid_type = "gridsearch"
if (
isinstance(self._clf, Models.Ensemble)
and self._base_params == "best"
):
hyperparams_base = Hyperparameters(
host=self._host, model=self._clf._base_model.get_model_name()
)
try:
# Get best hyperparameters obtained in gridsearch for base clf
(
base_hyperparams,
normalize,
standardize,
) = hyperparams_base.get_params(dataset)
# Merge hyperparameters with the ensemble ones
base_hyperparams = json.loads(base_hyperparams)
hyperparameters = self._clf.merge_parameters(base_hyperparams)
grid_type = "gridbest"
except ValueError:
pass
dt = Datasets(
normalize=normalize,
standardize=standardize,
set_of_files=self._set_of_files,
)
X, y = dt.load(dataset)
self._num_warnings = 0
warnings.warn = self._warn
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
# Also affect subprocesses
os.environ["PYTHONWARNINGS"] = "ignore"
grid_search = GridSearchCV(
model,
return_train_score=True,
param_grid=hyperparameters,
n_jobs=self._threads,
verbose=1,
)
start_time = time.time()
grid_search.fit(X, y)
time_spent = time.time() - start_time
parameters = json.dumps(
self._clf.modified_parameters(
grid_search.best_estimator_.get_params()
),
sort_keys=True,
)
if type(model).__name__ == "Stree":
best_model = grid_search.best_estimator_
nodes, leaves = best_model.nodes_leaves()
depth = best_model.depth_
else:
nodes = leaves = depth = 0
complexity = dict(nodes=nodes, leaves=leaves, depth=depth)
hyperparams.store(
dataset,
time_spent,
grid_search,
parameters,
normalize,
standardize,
grid_type,
complexity,
)
if self._num_warnings > 0:
print(f"{self._num_warnings} warnings have happend")
def _warn(self, *args, **kwargs) -> None:
self._num_warnings += 1