import os import json import random import warnings import time from datetime import datetime from tqdm import tqdm import numpy as np import pandas as pd from sklearn.model_selection import StratifiedKFold, KFold, cross_validate from Utils import Folders, Files from Models import Models from stree import Stree from wodt import Wodt from sklearn.tree import DecisionTreeClassifier class Randomized: seeds = [57, 31, 1714, 17, 23, 79, 83, 97, 7, 1] class Diterator: def __init__(self, data): self._stack = data.copy() def __next__(self): if len(self._stack) == 0: raise StopIteration() return self._stack.pop(0) class Datasets: def __init__(self): try: with open(os.path.join(Folders.data, Files.index)) as f: self.data_sets = f.read().splitlines() except FileNotFoundError: with open(os.path.join("..", Folders.data, Files.index)) as f: self.data_sets = f.read().splitlines() def load(self, name): try: data = pd.read_csv( os.path.join(Folders.data, Files.dataset(name)), sep="\t", index_col=0, ) except FileNotFoundError: data = pd.read_csv( os.path.join("..", Folders.data, Files.dataset(name)), sep="\t", index_col=0, ) X = data.drop("clase", axis=1).to_numpy() y = data["clase"].to_numpy() return X, y def __iter__(self) -> Diterator: return Diterator(self.data_sets) class BestResults: def __init__(self, score, model, datasets): self.score_name = score self.datasets = datasets self.model = model self.data = {} def _get_file_name(self): return os.path.join( Folders.results, Files.best_results(self.score_name, self.model) ) def load(self, dictionary): self.file_name = self._get_file_name() try: with open(self.file_name) as f: self.data = json.load(f) except FileNotFoundError: raise ValueError(f"{self.file_name} does not exist") return self.fill(dictionary, self.data) def fill(self, dictionary, data=None): if data is None: data = {} for dataset in self.datasets: if dataset not in data: data[dataset] = (0.0, dictionary, "") return data def _process_datafile(self, results, data, file_name): for record in data["results"]: dataset = record["dataset"] if dataset in results: if record["score"] >= results[dataset]["score"]: record["file_name"] = file_name results[dataset] = record else: record["file_name"] = file_name results[dataset] = record def build(self): results = {} init_suffix, end_suffix = Files.results_suffixes( score=self.score_name, model=self.model ) all_files = sorted(list(os.walk(Folders.results))) for root, _, files in tqdm(all_files, desc="files"): for name in files: if name.startswith(init_suffix) and name.endswith(end_suffix): file_name = os.path.join(root, name) with open(file_name) as fp: data = json.load(fp) self._process_datafile(results, data, name) # Build best results json file output = {} datasets = Datasets() for name in tqdm(list(datasets), desc="datasets"): output[name] = ( results[name]["score"], results[name]["hyperparameters"], results[name]["file_name"], ) self.data = output with open(self._get_file_name(), "w") as fp: json.dump(output, fp) class Experiment: def __init__( self, score_name, model_name, stratified, datasets, hyperparams_dict, hyperparams_file, platform, title, progress_bar=True, folds=5, ): today = datetime.now() self.time = today.strftime("%H:%M:%S") self.date = today.strftime("%Y-%m-%d") self.output_file = os.path.join( Folders.results, Files.results( score_name, model_name, platform, self.date, self.time, stratified, ), ) self.score_name = score_name self.model_name = model_name self.title = title self.stratified = stratified == "1" self.stratified_class = StratifiedKFold if self.stratified else KFold self.datasets = datasets dictionary = json.loads(hyperparams_dict) hyper = BestResults( score=score_name, model=model_name, datasets=datasets ) if hyperparams_file: self.hyperparameters_dict = hyper.load( dictionary=dictionary, ) else: self.hyperparameters_dict = hyper.fill( dictionary=dictionary, ) self.platform = platform self.progress_bar = progress_bar self.folds = folds self.random_seeds = Randomized.seeds self.results = [] self.duration = 0 self._init_experiment() def get_output_file(self): return self.output_file def _build_classifier(self, random_state, hyperparameters): self.model = Models.get_model(self.model_name, random_state) clf = self.model clf.set_params(**hyperparameters) clf.set_params(random_state=random_state) return clf def _init_experiment(self): self.scores = [] self.times = [] self.nodes = [] self.leaves = [] self.depths = [] def _n_fold_crossval(self, X, y, hyperparameters): if self.scores != []: raise ValueError("Must init experiment before!") loop = tqdm( self.random_seeds, position=1, leave=False, disable=not self.progress_bar, ) for random_state in loop: loop.set_description(f"Seed({random_state:4d})") random.seed(random_state) np.random.seed(random_state) kfold = self.stratified_class( shuffle=True, random_state=random_state, n_splits=self.folds ) clf = self._build_classifier(random_state, hyperparameters) self.version = clf.version() if hasattr(clf, "version") else "-" with warnings.catch_warnings(): warnings.filterwarnings("ignore") res = cross_validate( clf, X, y, cv=kfold, return_estimator=True, scoring=self.score_name, ) self.scores.append(res["test_score"]) self.times.append(res["fit_time"]) for result_item in res["estimator"]: nodes_item, leaves_item, depth_item = Models.get_complexity( self.model_name, result_item ) self.nodes.append(nodes_item) self.leaves.append(leaves_item) self.depths.append(depth_item) def _add_results(self, name, hyperparameters, samples, features, classes): record = {} record["dataset"] = name record["samples"] = samples record["features"] = features record["classes"] = classes record["hyperparameters"] = hyperparameters record["nodes"] = np.mean(self.nodes) record["leaves"] = np.mean(self.leaves) record["depth"] = np.mean(self.depths) record["score"] = np.mean(self.scores) record["score_std"] = np.std(self.scores) record["time"] = np.mean(self.times) record["time_std"] = np.std(self.times) self.results.append(record) def _output_results(self): output = {} output["score_name"] = self.score_name output["title"] = self.title output["model"] = self.model_name output["version"] = self.version output["stratified"] = self.stratified output["folds"] = self.folds output["date"] = self.date output["time"] = self.time output["duration"] = self.duration output["seeds"] = self.random_seeds output["platform"] = self.platform output["results"] = self.results with open(self.output_file, "w") as f: json.dump(output, f) def do_experiment(self): now = time.time() loop = tqdm( list(self.datasets), position=0, disable=not self.progress_bar, ) for name in loop: loop.set_description(f"{name:30s}") X, y = self.datasets.load(name) samp, feat = X.shape n_classes = len(np.unique(y)) hyperparameters = self.hyperparameters_dict[name][1] self._init_experiment() self._n_fold_crossval(X, y, hyperparameters) self._add_results(name, hyperparameters, samp, feat, n_classes) self.duration = time.time() - now self._output_results() if self.progress_bar: print(f"Results in {self.output_file}")