diff --git a/src/Platform/GridSearch.cc b/src/Platform/GridSearch.cc index 4f1f6e7..e196f7b 100644 --- a/src/Platform/GridSearch.cc +++ b/src/Platform/GridSearch.cc @@ -96,29 +96,32 @@ namespace platform { return Colors::RESET(); } } - json GridSearch::buildTasks() + json GridSearch::build_tasks_mpi() { - auto result = json::array(); + auto tasks = json::array(); + auto grid = GridData(Paths::grid_input(config.model)); auto datasets = Datasets(false, Paths::datasets()); auto datasets_names = processDatasets(datasets); - auto grid = GridData(Paths::grid_input(config.model)); for (const auto& dataset : datasets_names) { for (const auto& seed : config.seeds) { auto combinations = grid.getGrid(dataset); - for (const auto& hyperparam_line : combinations) { - auto hyperparameters = platform::HyperParameters(datasets.getNames(), hyperparam_line); + for (int n_fold = 0; n_fold < config.n_folds; n_fold++) { json task = { { "dataset", dataset }, { "seed", seed }, - { "hyperparameters", hyperparameters.get(dataset) } + { "fold", n_fold} }; - result.push_back(task); + tasks.push_back(task); } } } - return result; + // It's important to shuffle the array so heavy datasets are spread across the Workers + std::random_device rd; + std::mt19937 g(rd()); + std::shuffle(tasks.begin(), tasks.end(), g); + return tasks; } - std::pair GridSearch::partRange(int n_tasks, int nprocs, int rank) + std::pair GridSearch::part_range_mpi(int n_tasks, int nprocs, int rank) { int assigned = 0; int remainder = n_tasks % nprocs; @@ -140,11 +143,98 @@ namespace platform { { std::cout << "* (" << config_mpi.rank << "): " << status << std::endl; } - void GridSearch::go_MPI(struct ConfigMPI& config_mpi) + void GridSearch::process_task_mpi(struct ConfigMPI& config_mpi, json& task, Datasets& datasets, json& results) + { + // Process the task and store the result in the results json + auto grid = GridData(Paths::grid_input(config.model)); + auto dataset = task["dataset"].get(); + auto seed = task["seed"].get(); + auto n_fold = task["fold"].get(); + // Generate the hyperparamters combinations + auto combinations = grid.getGrid(dataset); + status(config_mpi, "Processing dataset " + dataset + " with seed " + std::to_string(seed) + " and fold " + std::to_string(n_fold)); + auto [X, y] = datasets.getTensors(dataset); + auto states = datasets.getStates(dataset); + auto features = datasets.getFeatures(dataset); + auto className = datasets.getClassName(dataset); + // + // Start working on task + // + Fold* fold; + if (config.stratified) + fold = new StratifiedKFold(config.n_folds, y, seed); + else + fold = new KFold(config.n_folds, y.size(0), seed); + auto [train, test] = fold->getFold(n_fold); + auto train_t = torch::tensor(train); + auto test_t = torch::tensor(test); + auto X_train = X.index({ "...", train_t }); + auto y_train = y.index({ train_t }); + auto X_test = X.index({ "...", test_t }); + auto y_test = y.index({ test_t }); + auto num = 0; + double best_fold_score = 0.0; + json best_fold_hyper; + for (const auto& hyperparam_line : combinations) { + //status(config_mpi, "* Dataset: " + dataset + " Fold: " + std::to_string(n_fold) + " Processing hyperparameters: " + std::to_string(++num) + "/" + std::to_string(combinations.size())); + auto hyperparameters = platform::HyperParameters(datasets.getNames(), hyperparam_line); + Fold* nested_fold; + if (config.stratified) + nested_fold = new StratifiedKFold(config.nested, y_train, seed); + else + nested_fold = new KFold(config.nested, y_train.size(0), seed); + double score = 0.0; + for (int n_nested_fold = 0; n_nested_fold < config.nested; n_nested_fold++) { + // Nested level fold + auto [train_nested, test_nested] = nested_fold->getFold(n_nested_fold); + auto train_nested_t = torch::tensor(train_nested); + auto test_nested_t = torch::tensor(test_nested); + auto X_nested_train = X_train.index({ "...", train_nested_t }); + auto y_nested_train = y_train.index({ train_nested_t }); + auto X_nested_test = X_train.index({ "...", test_nested_t }); + auto y_nested_test = y_train.index({ test_nested_t }); + // Build Classifier with selected hyperparameters + auto clf = Models::instance()->create(config.model); + auto valid = clf->getValidHyperparameters(); + hyperparameters.check(valid, dataset); + clf->setHyperparameters(hyperparameters.get(dataset)); + // Train model + clf->fit(X_nested_train, y_nested_train, features, className, states); + // Test model + score += clf->score(X_nested_test, y_nested_test); + } + delete nested_fold; + score /= config.nested; + if (score > best_fold_score) { + best_fold_score = score; + best_fold_hyper = hyperparam_line; + } + } + delete fold; + // Build Classifier with the best hyperparameters to obtain the best score + auto hyperparameters = platform::HyperParameters(datasets.getNames(), best_fold_hyper); + auto clf = Models::instance()->create(config.model); + auto valid = clf->getValidHyperparameters(); + hyperparameters.check(valid, dataset); + clf->setHyperparameters(best_fold_hyper); + clf->fit(X_train, y_train, features, className, states); + best_fold_score = clf->score(X_test, y_test); + // Save results + results[dataset][std::to_string(n_fold)]["score"] = best_fold_score; + results[dataset][std::to_string(n_fold)]["hyperparameters"] = best_fold_hyper; + results[dataset][std::to_string(n_fold)]["hyperparameters"] = seed; + status(config_mpi, "Finished dataset " + dataset + " with seed " + std::to_string(seed) + " and fold " + std::to_string(n_fold) + " score " + std::to_string(best_fold_score)); + } + void GridSearch::go_mpi(struct ConfigMPI& config_mpi) { /* - * Manager will do the loops dataset, seed, fold (primary) and hyperparameter - * Workers will do the loop fold (nested) + * Each task is a json object with the following structure: + * { + * "dataset": "dataset_name", + * "seed": # of seed to use, + * "model": "model_name", + * "Fold": # of fold to process + * } * * The overall process consists in these steps: * 1. Manager will broadcast the tasks to all the processes @@ -152,18 +242,18 @@ namespace platform { * 1.2 Broadcast the length of the following string * 1.2 Broadcast the tasks as a char* string * 2. Workers will receive the tasks and start the process - * 2.1 A method will tell each worker the range of combinations to process - * 2.2 Each worker will process the combinations and return the best score obtained - * 3. Manager gather the scores from all the workers and get the best hyperparameters - * 3.1 Manager find out which worker has the best score - * 3.2 Manager broadcast the winner worker - * 3.3 The winner worker send the best hyperparameters to manager - * + * 2.1 A method will tell each worker the range of tasks to process + * 2.2 Each worker will process the tasks and generate the best score for each task + * 3. Manager gather the scores from all the workers and find out the best hyperparameters for each dataset + * 3.1 Obtain the maximum size of the results message of all the workers + * 3.2 Gather all the results from the workers into the manager + * 3.3 Compile the results from all the workers + * 3.4 Filter the best hyperparameters for each dataset */ char* msg; int tasks_size; if (config_mpi.rank == config_mpi.manager) { - auto tasks = buildTasks(); + auto tasks = build_tasks_mpi(); auto tasks_str = tasks.dump(); tasks_size = tasks_str.size(); msg = new char[tasks_size + 1]; @@ -183,75 +273,66 @@ namespace platform { // 2. All Workers will receive the tasks and start the process // int num_tasks = tasks.size(); - auto [start, end] = partRange(num_tasks, config_mpi.n_procs, config_mpi.rank); - // 2.2 Each worker will process the combinations and return the best score obtained + // 2.1 A method will tell each worker the range of tasks to process + auto [start, end] = part_range_mpi(num_tasks, config_mpi.n_procs, config_mpi.rank); + // 2.2 Each worker will process the tasks and return the best scores obtained auto datasets = Datasets(config.discretize, Paths::datasets()); + json results; for (int i = start; i < end; ++i) { - auto task = tasks[i]; - auto dataset = task["dataset"].get(); - auto seed = task["seed"].get(); - auto hyperparam_line = task["hyperparameters"]; - status(config_mpi, "Processing dataset " + dataset + " with seed " + std::to_string(seed) + " and hyperparameters " + hyperparam_line.dump()); - auto [X, y] = datasets.getTensors(dataset); - auto states = datasets.getStates(dataset); - auto features = datasets.getFeatures(dataset); - auto className = datasets.getClassName(dataset); - double bestScore = 0.0; - json bestHyperparameters; - // First level fold - Fold* fold; - if (config.stratified) - fold = new StratifiedKFold(config.n_folds, y, seed); - else - fold = new KFold(config.n_folds, y.size(0), seed); - for (int nfold = 0; nfold < config.n_folds; nfold++) { - status(config_mpi, "Processing fold " + std::to_string(nfold + 1)); - auto [train, test] = fold->getFold(nfold); - auto train_t = torch::tensor(train); - auto test_t = torch::tensor(test); - auto X_train = X.index({ "...", train_t }); - auto y_train = y.index({ train_t }); - auto X_test = X.index({ "...", test_t }); - auto y_test = y.index({ test_t }); - auto num = 0; - json result_fold; - double hypScore = 0.0; - double bestHypScore = 0.0; - json bestHypHyperparameters; - Fold* nested_fold; - if (config.stratified) - nested_fold = new StratifiedKFold(config.nested, y_train, seed); - else - nested_fold = new KFold(config.nested, y_train.size(0), seed); - for (int n_nested_fold = 0; n_nested_fold < config.nested; n_nested_fold++) { - // Nested level fold - status(config_mpi, "Processing nested fold " + std::to_string(n_nested_fold + 1)); - auto [train_nested, test_nested] = nested_fold->getFold(n_nested_fold); - auto train_nested_t = torch::tensor(train_nested); - auto test_nested_t = torch::tensor(test_nested); - auto X_nexted_train = X_train.index({ "...", train_nested_t }); - auto y_nested_train = y_train.index({ train_nested_t }); - auto X_nested_test = X_train.index({ "...", test_nested_t }); - auto y_nested_test = y_train.index({ test_nested_t }); - // Build Classifier with selected hyperparameters - auto hyperparameters = platform::HyperParameters(datasets.getNames(), hyperparam_line); - auto clf = Models::instance()->create(config.model); - auto valid = clf->getValidHyperparameters(); - hyperparameters.check(valid, dataset); - clf->setHyperparameters(hyperparameters.get(dataset)); - // Train model - clf->fit(X_nexted_train, y_nested_train, features, className, states); - // Test model - hypScore += clf->score(X_nested_test, y_nested_test); - } - delete nested_fold; - hypScore /= config.nested; - if (hypScore > bestHypScore) { - bestHypScore = hypScore; - bestHypHyperparameters = hyperparam_line; + // Process task + process_task_mpi(config_mpi, tasks[i], datasets, results); + } + int size = results.dump().size() + 1; + int max_size = 0; + // + // 3. Manager gather the scores from all the workers and find out the best hyperparameters for each dataset + // + //3.1 Obtain the maximum size of the results message of all the workers + MPI_Reduce(&size, &max_size, 1, MPI_INT, MPI_MAX, config_mpi.manager, MPI_COMM_WORLD); + // Assign the memory to the message and initialize it to 0s + char* total; + msg = new char[max_size] {}; + strncpy(msg, results.dump().c_str(), size); + if (config_mpi.rank == config_mpi.manager) { + total = new char[max_size * config_mpi.n_procs] {}; + } + // 3.2 Gather all the results from the workers into the manager + MPI_Gather(msg, max_size, MPI_CHAR, total, max_size * config_mpi.n_procs, MPI_CHAR, config_mpi.manager, MPI_COMM_WORLD); + delete[] msg; + if (config_mpi.rank == config_mpi.manager) { + json total_results; + json best_results; + // 3.3 Compile the results from all the workers + for (int i = 0; i < config_mpi.n_procs; ++i) { + json partial_results = json::parse(total + i * max_size); + for (auto& [dataset, folds] : partial_results.items()) { + for (auto& [fold, result] : folds.items()) { + total_results[dataset][fold] = result; + } } } - delete fold; + delete[] total; + // 3.4 Filter the best hyperparameters for each dataset + auto grid = GridData(Paths::grid_input(config.model)); + for (auto& [dataset, folds] : total_results.items()) { + double best_score = 0.0; + json best_hyper; + for (auto& [fold, result] : folds.items()) { + if (result["score"] > best_score) { + best_score = result["score"]; + best_hyper = result["hyperparameters"]; + } + } + json result = { + { "score", best_score }, + { "hyperparameters", best_hyper }, + { "date", get_date() + " " + get_time() }, + { "grid", grid.getInputGrid(dataset) }, + { "duration", 0 } + }; + best_results[dataset] = result; + } + save(total_results); } } void GridSearch::go() diff --git a/src/Platform/GridSearch.h b/src/Platform/GridSearch.h index 4c757fa..c00b2ee 100644 --- a/src/Platform/GridSearch.h +++ b/src/Platform/GridSearch.h @@ -34,7 +34,7 @@ namespace platform { public: explicit GridSearch(struct ConfigGrid& config); void go(); - void go_MPI(struct ConfigMPI& config_mpi); + void go_mpi(struct ConfigMPI& config_mpi); ~GridSearch() = default; json getResults(); static inline std::string NO_CONTINUE() { return "NO_CONTINUE"; } @@ -45,8 +45,9 @@ namespace platform { pair processFileSingle(std::string fileName, Datasets& datasets, std::vector& combinations); pair processFileNested(std::string fileName, Datasets& datasets, std::vector& combinations); struct ConfigGrid config; - pair partRange(int n_tasks, int nprocs, int rank); - json buildTasks(); + pair part_range_mpi(int n_tasks, int nprocs, int rank); + json build_tasks_mpi(); + void process_task_mpi(struct ConfigMPI& config_mpi, json& task, Datasets& datasets, json& results); Timer timer; // used to measure the time of the whole process }; } /* namespace platform */ diff --git a/src/Platform/b_grid.cc b/src/Platform/b_grid.cc index e5bcd03..4439192 100644 --- a/src/Platform/b_grid.cc +++ b/src/Platform/b_grid.cc @@ -218,7 +218,7 @@ int main(int argc, char** argv) MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &mpi_config.rank); MPI_Comm_size(MPI_COMM_WORLD, &mpi_config.n_procs); - grid_search.go_MPI(mpi_config); + grid_search.go_mpi(mpi_config); MPI_Finalize(); } else { grid_search.go();