from cgitb import text import os import json import abc import shutil import subprocess import xlsxwriter from tqdm import tqdm from Experiments import Datasets, BestResults from Utils import Folders, Files, Symbols, BEST_ACCURACY_STREE, TextColor class BaseReport(abc.ABC): def __init__(self, file_name, best_file=False): self.file_name = file_name if not os.path.isfile(file_name): raise ValueError(f"{file_name} does not exists!") with open(file_name) as f: self.data = json.load(f) self.best_acc_file = best_file self.lines = self.data if best_file else self.data["results"] def _get_accuracy(self, item): return self.data[item][0] if self.best_acc_file else item["score"] def report(self): self.header() accuracy_total = 0.0 for result in self.lines: self.print_line(result) accuracy_total += self._get_accuracy(result) self.footer(accuracy_total) def _load_best_results(self, score, model): best = BestResults(score, model, Datasets()) self.best_results = best.load({}) def _compute_status(self, dataset, accuracy: float): best = self.best_results[dataset][0] status = " " if accuracy == best: status = Symbols.equal_best elif accuracy > best: status = Symbols.better_best if status != " ": if status not in self._compare_totals: self._compare_totals[status] = 1 else: self._compare_totals[status] += 1 return status @staticmethod def _status_meaning(status): meaning = { Symbols.equal_best: "Equal to best", Symbols.better_best: "Better than best", } return meaning[status] @abc.abstractmethod def header(self) -> None: pass @abc.abstractmethod def print_line(self, result) -> None: pass @abc.abstractmethod def footer(self, accuracy: float) -> None: pass class Report(BaseReport): header_lengths = [30, 5, 3, 3, 7, 7, 7, 15, 16, 15] header_cols = [ "Dataset", "Samp", "Var", "Cls", "Nodes", "Leaves", "Depth", "Score", "Time", "Hyperparameters", ] def __init__(self, file_name: str, compare: bool = False): super().__init__(file_name) self.nline = 0 self.compare = compare def header_line(self, text: str) -> None: print(TextColor.LINE1, end="") length = sum(self.header_lengths) + len(self.header_lengths) - 3 if text == "*": print("*" * (length + 2)) else: print(f"*{text:{length}s}*") def print_line(self, result) -> None: self.nline += 1 text_color = ( TextColor.LINE1 if self.nline % 2 == 0 else TextColor.LINE2 ) print(text_color, end="") hl = self.header_lengths i = 0 print(f"{result['dataset']:{hl[i]}s} ", end="") i += 1 print(f"{result['samples']:{hl[i]},d} ", end="") i += 1 print(f"{result['features']:{hl[i]}d} ", end="") i += 1 print(f"{result['classes']:{hl[i]}d} ", end="") i += 1 print(f"{result['nodes']:{hl[i]}.2f} ", end="") i += 1 print(f"{result['leaves']:{hl[i]}.2f} ", end="") i += 1 print(f"{result['depth']:{hl[i]}.2f} ", end="") i += 1 if self.compare: status = self._compute_status(result["dataset"], result["score"]) else: status = " " print( f"{result['score']:8.6f}±{result['score_std']:6.4f}{status}", end="", ) i += 1 print( f"{result['time']:9.6f}±{result['time_std']:6.4f} ", end="", ) i += 1 print(f"{str(result['hyperparameters']):{hl[i]}s} ") def header(self) -> None: if self.compare: self._load_best_results( self.data["score_name"], self.data["model"] ) self._compare_totals = {} self.header_line("*") self.header_line( f" Report {self.data['model']} ver. {self.data['version']}" f" with {self.data['folds']} Folds " f"cross validation and {len(self.data['seeds'])} random seeds" ) self.header_line(f" {self.data['title']}") self.header_line( f" Random seeds: {self.data['seeds']} Stratified: " f"{self.data['stratified']}" ) self.header_line( f" Execution took {self.data['duration']:7.2f} seconds on " f"{self.data['platform']}" ) self.header_line(f" Score is {self.data['score_name']}") self.header_line("*") print("") line_col = "" for field, underscore in zip(self.header_cols, self.header_lengths): print(f"{field:{underscore}s} ", end="") line_col += "=" * underscore + " " print(f"\n{line_col}") def footer(self, accuracy: float) -> None: self.header_line("*") if self.compare: for key, value in self._compare_totals.items(): self.header_line( f" {key} {self._status_meaning(key)} .....: {value:2d}" ) self.header_line( f" Accuracy compared to stree_default (liblinear-ovr) .: " f"{accuracy/BEST_ACCURACY_STREE:7.4f}" ) self.header_line("*") class ReportBest(BaseReport): header_lengths = [30, 8, 50, 35] header_cols = [ "Dataset", "Score", "File", "Hyperparameters", ] def __init__(self, score, model): file_name = os.path.join( Folders.results, Files.best_results(score, model) ) super().__init__(file_name, best_file=True) self.compare = False self.score_name = score self.model = model def header_line(self, text): length = sum(self.header_lengths) + len(self.header_lengths) - 3 if text == "*": print("*" * (length + 2)) else: print(f"*{text:{length}s}*") def print_line(self, result): hl = self.header_lengths print(f"{result:{hl[0]}s} ", end="") print( f"{self.data[result][0]:8.6f} ", end="", ) print( f"{self.data[result][2]:{hl[2]}s} ", end="", ) print(f"{str(self.data[result][1]):{hl[1]}s} ") def header(self): self.header_line("*") self.header_line( f" Report Best {self.score_name} Scores with {self.model} in any " "platform" ) self.header_line("*") print("") line_col = "" for field, underscore in zip(self.header_cols, self.header_lengths): print(f"{field:{underscore}s} ", end="") line_col += "=" * underscore + " " print(f"\n{line_col}") def footer(self, accuracy): self.header_line("*") if self.compare: for key, value in self._compare_totals.items(): self.header_line( f" {key} {self._status_meaning(key)} .....: {value:2d}" ) self.header_line( f" Scores compared to stree_default accuracy (liblinear-ovr) .: " f"{accuracy/BEST_ACCURACY_STREE:7.4f}" ) self.header_line("*") class Excel(BaseReport): row = 6 def __init__(self, file_name, compare=False): super().__init__(file_name) self.compare = compare def get_file_name(self): return self.excel_file_name def header(self): if self.compare: self._load_best_results( self.data["score_name"], self.data["model"] ) self._compare_totals = {} self.excel_file_name = self.file_name.replace(".json", ".xlsx") self.book = xlsxwriter.Workbook(self.excel_file_name) self.sheet = self.book.add_worksheet(self.data["model"]) header = self.book.add_format() header.set_font_size(18) subheader = self.book.add_format() subheader.set_font_size(16) self.sheet.write( 0, 0, f" Report {self.data['model']} ver. {self.data['version']}" f" with {self.data['folds']} Folds " f"cross validation and {len(self.data['seeds'])} random seeds", header, ) self.sheet.write( 1, 0, f" {self.data['title']}", subheader, ) self.sheet.write( 2, 0, f" Execution took {self.data['duration']:7.2f} seconds on " f"{self.data['platform']}", subheader, ) self.sheet.write( 2, 5, f"Random seeds: {self.data['seeds']}", subheader, ) self.sheet.write( 3, 0, f" Score is {self.data['score_name']}", subheader ) self.sheet.write( 3, 5, f"Stratified: {self.data['stratified']}", subheader, ) header_cols = [ ("Dataset", 30), ("Samples", 10), ("Features", 7), ("Classes", 7), ("Nodes", 7), ("Leaves", 7), ("Depth", 7), ("Score", 12), ("Score Std.", 12), ("Time", 12), ("Time Std.", 12), ("Parameters", 50), ] if self.compare: header_cols.insert(8, ("Stat", 3)) bold = self.book.add_format({"bold": True, "font_size": 14}) i = 0 for item, length in header_cols: self.sheet.write(5, i, item, bold) self.sheet.set_column(i, i, length) i += 1 def print_line(self, result): size_n = 14 decimal = self.book.add_format( {"num_format": "0.000000", "font_size": size_n} ) integer = self.book.add_format( {"num_format": "#,###", "font_size": size_n} ) normal = self.book.add_format({"font_size": size_n}) col = 0 self.sheet.write(self.row, col, result["dataset"], normal) self.sheet.write(self.row, col + 1, result["samples"], integer) self.sheet.write(self.row, col + 2, result["features"], normal) self.sheet.write(self.row, col + 3, result["classes"], normal) self.sheet.write(self.row, col + 4, result["nodes"], normal) self.sheet.write(self.row, col + 5, result["leaves"], normal) self.sheet.write(self.row, col + 6, result["depth"], normal) self.sheet.write(self.row, col + 7, result["score"], decimal) if self.compare: status = self._compute_status(result["dataset"], result["score"]) self.sheet.write(self.row, col + 8, status, normal) col = 9 else: col = 8 self.sheet.write(self.row, col, result["score_std"], decimal) self.sheet.write(self.row, col + 1, result["time"], decimal) self.sheet.write(self.row, col + 2, result["time_std"], decimal) self.sheet.write( self.row, col + 3, str(result["hyperparameters"]), normal ) self.row += 1 def footer(self, accuracy): if self.compare: self.row += 2 bold = self.book.add_format({"bold": True, "font_size": 16}) for key, total in self._compare_totals.items(): self.sheet.write(self.row, 1, key, bold) self.sheet.write(self.row, 2, total, bold) self.sheet.write(self.row, 3, self._status_meaning(key), bold) self.row += 1 message = ( f"** Accuracy compared to stree_default (liblinear-ovr) .: " f"{accuracy/BEST_ACCURACY_STREE:7.4f}" ) bold = self.book.add_format({"bold": True, "font_size": 14}) self.sheet.write(self.row + 1, 0, message, bold) for c in range(self.row + 2): self.sheet.set_row(c, 20) self.sheet.set_row(0, 25) self.book.close() class SQL(BaseReport): table_name = "results" def header(self): file_name = self.file_name.replace(".json", ".sql") self.file = open(file_name, "w") def print_line(self, result): attributes = [ "date", "time", "type", "title", "stratified", "score_name", "score", "score_std", "dataset", "classifier", "version", "norm", "stand", "time_spent", "time_spent_std", "parameters", "nodes", "leaves", "depth", "platform", "nfolds", "seeds", ] command_insert = ( f"replace into {self.table_name} (" + ",".join(attributes) + ") values(" + ("'%s'," * len(attributes))[:-1] + ");\n" ) values = ( self.data["date"], self.data["time"], "crossval", self.data["title"], "1" if self.data["stratified"] else "0", self.data["score_name"], result["score"], result["score_std"], result["dataset"], self.data["model"], self.data["version"], 0, 1, result["time"], result["time_std"], str(result["hyperparameters"]).replace("'", '"'), result["nodes"], result["leaves"], result["depth"], self.data["platform"], self.data["folds"], str(self.data["seeds"]), ) self.file.write(command_insert % values) def footer(self, accuracy): self.file.close() class Benchmark: @staticmethod def get_result_file_name(score): return os.path.join(Folders.results, Files.exreport(score)) @staticmethod def _process_dataset(results, data): model = data["model"] for record in data["results"]: dataset = record["dataset"] if (model, dataset) in results: if record["score"] > results[model, dataset][0]: results[model, dataset] = ( record["score"], record["score_std"], ) else: results[model, dataset] = ( record["score"], record["score_std"], ) @staticmethod def compile_results(score): # build Files.exreport result_file_name = Benchmark.get_result_file_name(score) results = {} init_suffix, end_suffix = Files.results_suffixes(score=score) all_files = list(os.walk(Folders.results)) for root, _, files in tqdm(all_files, desc="files"): for name in files: if name.startswith(init_suffix) and name.endswith(end_suffix): file_name = os.path.join(root, name) with open(file_name) as fp: data = json.load(fp) Benchmark._process_dataset(results, data) with open(result_file_name, "w") as f: f.write(f"classifier, dataset, {score}, stdev\n") for (model, dataset), (accuracy, stdev) in results.items(): f.write(f"{model}, {dataset}, {accuracy}, {stdev}\n") @staticmethod def exreport(score): def end_message(message, file): length = 100 print("*" * length) print(message) print("*" * length) with open(os.path.join(Folders.results, file)) as f: data = f.read().splitlines() for line in data: print(line) # Remove previous results try: shutil.rmtree(Folders.report) os.remove(Files.exreport_pdf) except FileNotFoundError: pass except OSError as e: print("Error: %s : %s" % (Folders.report, e.strerror)) # Compute Friedman & Holm Tests fout = open( os.path.join(Folders.results, Files.exreport_output(score)), "w" ) ferr = open( os.path.join(Folders.results, Files.exreport_err(score)), "w" ) result = subprocess.run( ["Rscript", os.path.join(Folders.src, Files.benchmark_r), score], stdout=fout, stderr=ferr, ) fout.close() ferr.close() if result.returncode != 0: end_message("Error computing benchmark", Files.exreport_err(score)) else: end_message("Benchmark Ok", Files.exreport_output(score)) Files.open(Files.exreport_pdf) @staticmethod def build_results(score): # Build results data structure file_name = Benchmark.get_result_file_name(score) results = {} with open(file_name) as f: data = f.read().splitlines() data = data[1:] for line in data: model, dataset, accuracy, stdev = line.split(", ") if model not in results: results[model] = {} results[model][dataset] = (accuracy, stdev) return results @staticmethod def report(score): def show(results): datasets = results[list(results)[0]] print(f"{'Dataset':30s} ", end="") lines = "=" * 30 + " " for model in results: print(f"{model:^13s} ", end="") lines += "=" * 13 + " " print(f"\n{lines}") for dataset, _ in datasets.items(): print(f"{dataset:30s} ", end="") for model in results: print(f"{float(results[model][dataset][0]):.5f}±", end="") print(f"{float(results[model][dataset][1]):.3f} ", end="") print("") print(f"* Score is: {score}") show(Benchmark.build_results(score)) @staticmethod def get_excel_file_name(score): return os.path.join(Folders.exreport, Files.exreport_excel(score)) @staticmethod def excel(score): results = Benchmark.build_results(score) book = xlsxwriter.Workbook(Benchmark.get_excel_file_name(score)) sheet = book.add_worksheet("Benchmark") normal = book.add_format({"font_size": 14}) bold = book.add_format({"bold": True, "font_size": 14}) decimal = book.add_format({"num_format": "0.000000", "font_size": 14}) merge_format = book.add_format( { "bold": 1, "align": "center", "valign": "vcenter", "font_size": 14, } ) row = row_init = 4 def header(): nonlocal row sheet.merge_range(0, 0, 1, 0, "Benchmark of Models", merge_format) sheet.write(1, 2, f"Score is {score}", bold) sheet.set_row(1, 20) # Set columns width sheet.set_column(0, 0, 40) for column in range(2 * len(results)): sheet.set_column(column + 1, column + 1, 15) # Set report header # Merge 2 rows sheet.merge_range(row, 0, row + 1, 0, "Dataset", merge_format) column = 1 for model in results: # Merge 2 columns sheet.merge_range( row, column, row, column + 1, model, merge_format ) column += 2 row += 1 column = 1 for _ in range(len(results)): sheet.write(row, column, "Score", merge_format) sheet.write(row, column + 1, "Stdev", merge_format) column += 2 def body(): nonlocal row datasets = results[list(results)[0]] for dataset, _ in datasets.items(): row += 1 sheet.write(row, 0, f"{dataset:30s}", normal) column = 1 for model in results: sheet.write( row, column, float(results[model][dataset][0]), decimal, ) column += 1 sheet.write( row, column, float(results[model][dataset][1]), decimal, ) column += 1 def footer(): for c in range(row_init, row + 1): sheet.set_row(c, 20) header() body() footer() book.close() class StubReport(BaseReport): def __init__(self, file_name): super().__init__(file_name=file_name, best_file=False) def print_line(self, line) -> None: pass def header(self) -> None: self.title = self.data["title"] def footer(self, accuracy: float) -> None: self.accuracy = accuracy class Summary: def __init__(self) -> None: self.results = Files().get_all_results() self.data = [] self.datasets = {} def acquire(self) -> None: """Get all results""" for result in self.results: ( score, model, platform, date, time, stratified, ) = Files().split_file_name(result) report = StubReport(os.path.join(Folders.results, result)) report.report() entry = dict( score=score, model=model, title=report.title, platform=platform, date=date, time=time, stratified=stratified, file=result, metric=report.accuracy / BEST_ACCURACY_STREE, ) self.datasets[result] = report.lines self.data.append(entry) def list(self) -> None: """Print the list of results""" max_length = max(len(x["file"]) for x in self.data) print( "\n".join( [ f"{x['file']:{max_length}s} {x['metric']:7.3f} " f"{x['title']}" for x in self.data ] ) ) def show_result(self, data: dict, title: str = "") -> None: def whites(n: int) -> str: return " " * n + "*" if data == {}: print(f"** {title} has No data **") return file_name = data["file"] metric = data["metric"] result = StubReport(os.path.join(Folders.results, file_name)) length = 80 print("*" * length) if title != "": print(f"*{title:^{length - 2}s}*") print("*" + "-" * (length - 2) + "*") print("*" + whites(length - 2)) print(f"* {result.data['title']:^{length - 4}} *") print("*" + whites(length - 2)) print( f"* Model: {result.data['model']:15s} " f"Ver. {result.data['version']:10s} " f"Score: {result.data['score_name']:10s} " f"Metric: {metric:10.7f}" + whites(length - 78) ) print("*" + whites(length - 2)) print( f"* Date : {result.data['date']:15s} Time: " f"{result.data['time']:18s} Time Spent: " f"{result.data['duration']:9,.2f} secs." + whites(length - 78) ) seeds = str(result.data["seeds"]) seeds_len = len(seeds) print( f"* Seeds: {seeds:{seeds_len}s} Platform: " f"{result.data['platform']:17s} " + whites(length - 79) ) print( f"* Stratified: {str(result.data['stratified']):15s}" + whites(length - 30) ) print(f"* {file_name:60s}" + whites(length - 63)) print("*" + whites(length - 2)) print("*" * length) def best_result( self, criterion=None, value=None, score="accuracy" ) -> dict: # First filter the same score results (accuracy, f1, ...) haystack = [x for x in self.data if x["score"] == score] haystack = ( haystack if criterion is None or value is None else [x for x in haystack if x[criterion] == value] ) return ( sorted(haystack, key=lambda x: x["metric"], reverse=True)[0] if len(haystack) > 0 else {} ) def best_results_datasets(self, score="accuracy") -> dict: """Get the best results for each dataset""" dt = Datasets() best_results = {} for dataset in dt: best_results[dataset] = (1, "", "", "") haystack = [x for x in self.data if x["score"] == score] # Search for the best results for each dataset for entry in haystack: for dataset in self.datasets[entry["file"]]: if dataset["score"] < best_results[dataset["dataset"]][0]: best_results[dataset["dataset"]] = ( dataset["score"], dataset["hyperparameters"], entry["file"], entry["title"], ) return best_results