import argparse import random import time from datetime import datetime import json import numpy as np from sklearn.tree import DecisionTreeClassifier from stree import Stree from sklearn.model_selection import KFold, cross_validate from experimentation.Sets import Datasets from experimentation.Database import MySQL from wodt import TreeClassifier from experimentation.Utils import TextColor def parse_arguments(): ap = argparse.ArgumentParser() ap.add_argument( "-S", "--set-of-files", type=str, choices=["aaai", "tanveer"], required=False, default="tanveer", ) ap.add_argument( "-m", "--model", type=str, required=False, default="stree_default", help="model name, default stree_default", ) ap.add_argument( "-d", "--dataset", type=str, required=True, help="dataset to process, all for everyone", ) ap.add_argument( "-s", "--sql", default=False, type=bool, required=False, help="generate report_score.sql", ) ap.add_argument( "-n", "--normalize", type=int, required=True, ) ap.add_argument( "-p", "--parameters", type=str, required=False, default="{}" ) args = ap.parse_args() return ( args.set_of_files, args.model, args.dataset, args.sql, bool(args.normalize), args.parameters, ) def get_classifier(model, random_state, hyperparameters): if model == "stree" or model == "stree_default": clf = Stree(random_state=random_state) clf.set_params(**hyperparameters) if model == "wodt": clf = TreeClassifier(random_state=random_state) if model == "cart": clf = DecisionTreeClassifier(random_state=random_state) return clf def process_dataset(dataset, verbose, model, params): X, y = dt.load(dataset) scores = [] times = [] nodes = [] leaves = [] depths = [] if verbose: print( f"* Processing dataset [{dataset}] from Set: {set_of_files} with " f"{model}" ) print(f"X.shape: {X.shape}") print(f"{X[:4]}") print(f"Random seeds: {random_seeds}") hyperparameters = json.loads(params) if model == "stree": # Get the optimized parameters record = dbh.find_best(dataset, model, "gridsearch") hyperparameters = json.loads(record[8] if record[8] != "" else "{}") hyperparameters.pop("random_state", None) for random_state in random_seeds: random.seed(random_state) np.random.seed(random_state) kfold = KFold(shuffle=True, random_state=random_state, n_splits=5) clf = get_classifier(model, random_state, hyperparameters) res = cross_validate(clf, X, y, cv=kfold, return_estimator=True) scores.append(res["test_score"]) times.append(res["fit_time"]) for result_item in res["estimator"]: if model == "cart": nodes_item = result_item.tree_.node_count depth_item = result_item.tree_.max_depth leaves_item = result_item.get_n_leaves() else: nodes_item, leaves_item = result_item.nodes_leaves() depth_item = result_item.depth_ nodes.append(nodes_item) leaves.append(leaves_item) depths.append(depth_item) if verbose: print( f"Random seed: {random_state:5d} Accuracy: " f"{res['test_score'].mean():6.4f}±" f"{res['test_score'].std():6.4f} " f"{res['fit_time'].mean():5.3f}s" ) return scores, times, json.dumps(hyperparameters), nodes, leaves, depths def store_string( dataset, model, accuracy, time_spent, hyperparameters, complexity ): attributes = [ "date", "time", "type", "accuracy", "accuracy_std", "dataset", "classifier", "norm", "stand", "time_spent", "time_spent_std", "parameters", "nodes", "leaves", "depth", ] command_insert = ( "replace into results (" + ",".join(attributes) + ") values(" + ("'%s'," * len(attributes))[:-1] + ");" ) now = datetime.now() date = now.strftime("%Y-%m-%d") time = now.strftime("%H:%M:%S") nodes, leaves, depth = complexity.values() values = ( date, time, "crossval", np.mean(accuracy), np.std(accuracy), dataset, model, 1, 0, np.mean(time_spent), np.std(time_spent), hyperparameters, nodes, leaves, depth, ) result = command_insert % values return result def compute_status(dbh, name, model, accuracy): better_default = "\N{heavy check mark}" better_stree = TextColor.GREEN + "\N{heavy check mark}" + TextColor.ENDC best = TextColor.RED + "\N{black star}" + TextColor.ENDC best_default, _ = get_best_score(dbh, name, model) best_stree, _ = get_best_score(dbh, name, "stree") best_all, _ = get_best_score(dbh, name, models_tree) status = better_default if accuracy >= best_default else " " status = better_stree if accuracy >= best_stree else status status = best if accuracy >= best_all else status return status def get_best_score(dbh, name, model): record = dbh.find_best(name, model, "crossval") accuracy = record[5] if record is not None else 0.0 acc_std = record[11] if record is not None else 0.0 return accuracy, acc_std random_seeds = [57, 31, 1714, 17, 23, 79, 83, 97, 7, 1] models_tree = [ "stree", "stree_default", "wodt", "j48svm", "oc1", "cart", "baseRaF", ] standardize = False (set_of_files, model, dataset, sql, normalize, parameters) = parse_arguments() dbh = MySQL() if sql: sql_output = open(f"{model}.sql", "w") database = dbh.get_connection() dt = Datasets(normalize, standardize, set_of_files) start = time.time() if dataset == "all": print( f"* Process all datasets set with {model}: {set_of_files} " f"norm: {normalize} std: {standardize} store in: {model}" ) print(f"5 Fold Cross Validation with 10 random seeds {random_seeds}\n") header_cols = [ "Dataset", "Samp", "Var", "Cls", "Nodes", "Leaves", "Depth", "Accuracy", "Time", "Parameters", ] header_lengths = [30, 5, 3, 3, 7, 7, 7, 15, 15, 10] parameters = json.dumps(json.loads(parameters)) if parameters != "{}" and len(parameters) > 10: header_lengths.pop() header_lengths.append(len(parameters)) line_col = "" for field, underscore in zip(header_cols, header_lengths): print(f"{field:{underscore}s} ", end="") line_col += "=" * underscore + " " print(f"\n{line_col}") for dataset in dt: name = dataset[0] X, y = dt.load(name) # type: ignore samples, features = X.shape classes = len(np.unique(y)) print( f"{name:30s} {samples:5d} {features:3d} {classes:3d} ", end="", ) scores, times, hyperparameters, nodes, leaves, depth = process_dataset( dataset[0], verbose=False, model=model, params=parameters ) complexity = dict( nodes=float(np.mean(nodes)), leaves=float(np.mean(leaves)), depth=float(np.mean(depth)), ) nodes_item, leaves_item, depth_item = complexity.values() print( f"{nodes_item:7.2f} {leaves_item:7.2f} {depth_item:7.2f} ", end="", ) accuracy = np.mean(scores) status = ( compute_status(dbh, name, model, accuracy) if model == "stree_default" else " " ) print(f"{accuracy:8.6f}±{np.std(scores):6.4f}{status}", end="") print(f"{np.mean(times):8.6f}±{np.std(times):6.4f} {hyperparameters}") if sql: command = store_string( name, model, scores, times, hyperparameters, complexity ) print(command, file=sql_output) else: scores, times, hyperparameters, nodes, leaves, depth = process_dataset( dataset, verbose=True, model=model, params=parameters ) best_accuracy, acc_best_std = get_best_score(dbh, dataset, model) accuracy = np.mean(scores) print(f"* Normalize/Standard.: {normalize} / {standardize}") print( f"* Accuracy Computed .: {accuracy:6.4f}±{np.std(scores):6.4f} " f"{np.mean(times):5.3f}s" ) print(f"* Best Accuracy model: {best_accuracy:6.4f}±{acc_best_std:6.4f}") print(f"* Difference ........: {best_accuracy - accuracy:6.4f}") best_accuracy, acc_best_std = get_best_score(dbh, dataset, models_tree) print(f"* Best Accuracy .....: {best_accuracy:6.4f}±{acc_best_std:6.4f}") print(f"* Difference ........: {best_accuracy - accuracy:6.4f}") print( f"* Nodes/Leaves/Depth : {np.mean(nodes):.2f} {np.mean(leaves):.2f} " f"{np.mean(depth):.2f} " ) print(f"- Hyperparameters ...: {hyperparameters}") stop = time.time() hours, rem = divmod(stop - start, 3600) minutes, seconds = divmod(rem, 60) print(f"Time: {int(hours):2d}h {int(minutes):2d}m {int(seconds):2d}s") if sql: sql_output.close() dbh.close()