diff --git a/CMakeLists.txt b/CMakeLists.txt index 4d1bc2a..e33b67c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -25,12 +25,18 @@ set(CMAKE_CXX_EXTENSIONS OFF) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${TORCH_CXX_FLAGS}") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread") - # Options # ------- option(ENABLE_CLANG_TIDY "Enable to add clang tidy." OFF) option(ENABLE_TESTING "Unit testing build" OFF) option(CODE_COVERAGE "Collect coverage from test library" OFF) +option(MPI_ENABLED "Enable MPI options" ON) + +if (MPI_ENABLED) + find_package(MPI REQUIRED) + message("MPI_CXX_LIBRARIES=${MPI_CXX_LIBRARIES}") + message("MPI_CXX_INCLUDE_DIRS=${MPI_CXX_INCLUDE_DIRS}") +endif (MPI_ENABLED) # Boost Library set(Boost_USE_STATIC_LIBS OFF) diff --git a/README.md b/README.md index 2acf581..6ddd7c1 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,20 @@ Bayesian Network Classifier with libtorch from scratch Before compiling BayesNet. +### MPI + +In Linux just install openmpi & openmpi-devel packages. Only cmake can't find openmpi install (like in Oracle Linux) set the following variable: + +```bash +export MPI_HOME="/usr/lib64/openmpi" +``` + +In Mac OS X, install mpich with brew and if cmake doesn't find it, edit mpicxx wrapper to remove the ",-commons,use_dylibs" from final_ldflags + +```bash +vi /opt/homebrew/bin/mpicx +``` + ### boost library [Getting Started]() diff --git a/src/Platform/CMakeLists.txt b/src/Platform/CMakeLists.txt index d35989f..e475b16 100644 --- a/src/Platform/CMakeLists.txt +++ b/src/Platform/CMakeLists.txt @@ -7,6 +7,7 @@ include_directories(${BayesNet_SOURCE_DIR}/lib/argparse/include) include_directories(${BayesNet_SOURCE_DIR}/lib/json/include) include_directories(${BayesNet_SOURCE_DIR}/lib/libxlsxwriter/include) include_directories(${Python3_INCLUDE_DIRS}) +include_directories(${MPI_CXX_INCLUDE_DIRS}) add_executable(b_best b_best.cc BestResults.cc Result.cc Statistics.cc BestResultsExcel.cc ReportExcel.cc ReportBase.cc Datasets.cc Dataset.cc ExcelFile.cc) add_executable(b_grid b_grid.cc GridSearch.cc GridData.cc HyperParameters.cc Folding.cc Datasets.cc Dataset.cc) @@ -15,7 +16,7 @@ add_executable(b_main b_main.cc Folding.cc Experiment.cc Datasets.cc Dataset.cc add_executable(b_manage b_manage.cc Results.cc ManageResults.cc CommandParser.cc Result.cc ReportConsole.cc ReportExcel.cc ReportBase.cc Datasets.cc Dataset.cc ExcelFile.cc) target_link_libraries(b_best Boost::boost "${XLSXWRITER_LIB}" "${TORCH_LIBRARIES}" ArffFiles mdlp) -target_link_libraries(b_grid BayesNet PyWrap) +target_link_libraries(b_grid BayesNet PyWrap ${MPI_CXX_LIBRARIES}) target_link_libraries(b_list ArffFiles mdlp "${TORCH_LIBRARIES}") target_link_libraries(b_main BayesNet ArffFiles mdlp "${TORCH_LIBRARIES}" PyWrap) target_link_libraries(b_manage "${TORCH_LIBRARIES}" "${XLSXWRITER_LIB}" ArffFiles mdlp) \ No newline at end of file diff --git a/src/Platform/GridSearch.cc b/src/Platform/GridSearch.cc index 9f91c6a..76c4b4c 100644 --- a/src/Platform/GridSearch.cc +++ b/src/Platform/GridSearch.cc @@ -38,6 +38,39 @@ namespace platform { } return json(); } + vector GridSearch::processDatasets(Datasets& datasets) + { + // Load datasets + auto datasets_names = datasets.getNames(); + if (config.continue_from != NO_CONTINUE()) { + // Continue previous execution: + if (std::find(datasets_names.begin(), datasets_names.end(), config.continue_from) == datasets_names.end()) { + throw std::invalid_argument("Dataset " + config.continue_from + " not found"); + } + // Remove datasets already processed + vector< string >::iterator it = datasets_names.begin(); + while (it != datasets_names.end()) { + if (*it != config.continue_from) { + it = datasets_names.erase(it); + } else { + if (config.only) + ++it; + else + break; + } + } + } + // Exclude datasets + for (const auto& name : config.excluded) { + auto dataset = name.get(); + auto it = std::find(datasets_names.begin(), datasets_names.end(), dataset); + if (it == datasets_names.end()) { + throw std::invalid_argument("Dataset " + dataset + " already excluded or doesn't exist!"); + } + datasets_names.erase(it); + } + return datasets_names; + } void showProgressComb(const int num, const int n_folds, const int total, const std::string& color) { int spaces = int(log(total) / log(10)) + 1; @@ -63,6 +96,257 @@ namespace platform { return Colors::RESET(); } } + json GridSearch::build_tasks_mpi() + { + auto tasks = json::array(); + auto grid = GridData(Paths::grid_input(config.model)); + auto datasets = Datasets(false, Paths::datasets()); + auto datasets_names = processDatasets(datasets); + for (const auto& dataset : datasets_names) { + for (const auto& seed : config.seeds) { + auto combinations = grid.getGrid(dataset); + for (int n_fold = 0; n_fold < config.n_folds; n_fold++) { + json task = { + { "dataset", dataset }, + { "seed", seed }, + { "fold", n_fold} + }; + tasks.push_back(task); + } + } + } + // It's important to shuffle the array so heavy datasets are spread across the Workers + std::mt19937 g{ 271 }; // Use fixed seed to obtain the same shuffle + std::shuffle(tasks.begin(), tasks.end(), g); + std::cout << "Tasks size: " << tasks.size() << std::endl; + std::cout << "|"; + for (int i = 0; i < tasks.size(); ++i) { + std::cout << (i + 1) % 10; + } + std::cout << "|" << std::endl << "|" << std::flush; + return tasks; + } + std::pair GridSearch::part_range_mpi(int n_tasks, int nprocs, int rank) + { + int assigned = 0; + int remainder = n_tasks % nprocs; + int start = 0; + if (rank < remainder) { + assigned = n_tasks / nprocs + 1; + } else { + assigned = n_tasks / nprocs; + start = remainder; + } + start += rank * assigned; + int end = start + assigned; + if (rank == nprocs - 1) { + end = n_tasks; + } + return { start, end }; + } + std::string get_color_rank(int rank) + { + auto colors = { Colors::RED(), Colors::GREEN(), Colors::BLUE(), Colors::MAGENTA(), Colors::CYAN() }; + return *(colors.begin() + rank % colors.size()); + } + void GridSearch::process_task_mpi(struct ConfigMPI& config_mpi, json& task, Datasets& datasets, json& results) + { + // Process the task and store the result in the results json + Timer timer; + timer.start(); + auto grid = GridData(Paths::grid_input(config.model)); + auto dataset = task["dataset"].get(); + auto seed = task["seed"].get(); + auto n_fold = task["fold"].get(); + // Generate the hyperparamters combinations + auto combinations = grid.getGrid(dataset); + auto [X, y] = datasets.getTensors(dataset); + auto states = datasets.getStates(dataset); + auto features = datasets.getFeatures(dataset); + auto className = datasets.getClassName(dataset); + // + // Start working on task + // + Fold* fold; + if (config.stratified) + fold = new StratifiedKFold(config.n_folds, y, seed); + else + fold = new KFold(config.n_folds, y.size(0), seed); + auto [train, test] = fold->getFold(n_fold); + auto train_t = torch::tensor(train); + auto test_t = torch::tensor(test); + auto X_train = X.index({ "...", train_t }); + auto y_train = y.index({ train_t }); + auto X_test = X.index({ "...", test_t }); + auto y_test = y.index({ test_t }); + auto num = 0; + double best_fold_score = 0.0; + json best_fold_hyper; + for (const auto& hyperparam_line : combinations) { + auto hyperparameters = platform::HyperParameters(datasets.getNames(), hyperparam_line); + Fold* nested_fold; + if (config.stratified) + nested_fold = new StratifiedKFold(config.nested, y_train, seed); + else + nested_fold = new KFold(config.nested, y_train.size(0), seed); + double score = 0.0; + for (int n_nested_fold = 0; n_nested_fold < config.nested; n_nested_fold++) { + // Nested level fold + auto [train_nested, test_nested] = nested_fold->getFold(n_nested_fold); + auto train_nested_t = torch::tensor(train_nested); + auto test_nested_t = torch::tensor(test_nested); + auto X_nested_train = X_train.index({ "...", train_nested_t }); + auto y_nested_train = y_train.index({ train_nested_t }); + auto X_nested_test = X_train.index({ "...", test_nested_t }); + auto y_nested_test = y_train.index({ test_nested_t }); + // Build Classifier with selected hyperparameters + auto clf = Models::instance()->create(config.model); + auto valid = clf->getValidHyperparameters(); + hyperparameters.check(valid, dataset); + clf->setHyperparameters(hyperparameters.get(dataset)); + // Train model + clf->fit(X_nested_train, y_nested_train, features, className, states); + // Test model + score += clf->score(X_nested_test, y_nested_test); + } + delete nested_fold; + score /= config.nested; + if (score > best_fold_score) { + best_fold_score = score; + best_fold_hyper = hyperparam_line; + } + } + delete fold; + // Build Classifier with the best hyperparameters to obtain the best score + auto hyperparameters = platform::HyperParameters(datasets.getNames(), best_fold_hyper); + auto clf = Models::instance()->create(config.model); + auto valid = clf->getValidHyperparameters(); + hyperparameters.check(valid, dataset); + clf->setHyperparameters(best_fold_hyper); + clf->fit(X_train, y_train, features, className, states); + best_fold_score = clf->score(X_test, y_test); + // Save results + results[dataset][std::to_string(n_fold)]["score"] = best_fold_score; + results[dataset][std::to_string(n_fold)]["hyperparameters"] = best_fold_hyper; + results[dataset][std::to_string(n_fold)]["seed"] = seed; + results[dataset][std::to_string(n_fold)]["duration"] = timer.getDuration(); + std::cout << get_color_rank(config_mpi.rank) << "*" << std::flush; + } + void GridSearch::go_mpi(struct ConfigMPI& config_mpi) + { + /* + * Each task is a json object with the following structure: + * { + * "dataset": "dataset_name", + * "seed": # of seed to use, + * "model": "model_name", + * "Fold": # of fold to process + * } + * + * The overall process consists in these steps: + * 1. Manager will broadcast the tasks to all the processes + * 1.1 Broadcast the number of tasks + * 1.2 Broadcast the length of the following string + * 1.2 Broadcast the tasks as a char* string + * 2. Workers will receive the tasks and start the process + * 2.1 A method will tell each worker the range of tasks to process + * 2.2 Each worker will process the tasks and generate the best score for each task + * 3. Manager gather the scores from all the workers and find out the best hyperparameters for each dataset + * 3.1 Obtain the maximum size of the results message of all the workers + * 3.2 Gather all the results from the workers into the manager + * 3.3 Compile the results from all the workers + * 3.4 Filter the best hyperparameters for each dataset + */ + char* msg; + int tasks_size; + if (config_mpi.rank == config_mpi.manager) { + timer.start(); + auto tasks = build_tasks_mpi(); + auto tasks_str = tasks.dump(); + tasks_size = tasks_str.size(); + msg = new char[tasks_size + 1]; + strcpy(msg, tasks_str.c_str()); + } + // + // 1. Manager will broadcast the tasks to all the processes + // + MPI_Bcast(&tasks_size, 1, MPI_INT, config_mpi.manager, MPI_COMM_WORLD); + if (config_mpi.rank != config_mpi.manager) { + msg = new char[tasks_size + 1]; + } + MPI_Bcast(msg, tasks_size + 1, MPI_CHAR, config_mpi.manager, MPI_COMM_WORLD); + json tasks = json::parse(msg); + delete[] msg; + // + // 2. All Workers will receive the tasks and start the process + // + int num_tasks = tasks.size(); + // 2.1 A method will tell each worker the range of tasks to process + auto [start, end] = part_range_mpi(num_tasks, config_mpi.n_procs, config_mpi.rank); + // 2.2 Each worker will process the tasks and return the best scores obtained + auto datasets = Datasets(config.discretize, Paths::datasets()); + json results; + for (int i = start; i < end; ++i) { + // Process task + process_task_mpi(config_mpi, tasks[i], datasets, results); + } + int size = results.dump().size() + 1; + int max_size = 0; + // + // 3. Manager gather the scores from all the workers and find out the best hyperparameters for each dataset + // + //3.1 Obtain the maximum size of the results message of all the workers + MPI_Allreduce(&size, &max_size, 1, MPI_INT, MPI_MAX, MPI_COMM_WORLD); + // Assign the memory to the message and initialize it to 0s + char* total = NULL; + msg = new char[max_size]; + strncpy(msg, results.dump().c_str(), size); + if (config_mpi.rank == config_mpi.manager) { + total = new char[max_size * config_mpi.n_procs]; + } + // 3.2 Gather all the results from the workers into the manager + MPI_Gather(msg, max_size, MPI_CHAR, total, max_size, MPI_CHAR, config_mpi.manager, MPI_COMM_WORLD); + delete[] msg; + if (config_mpi.rank == config_mpi.manager) { + std::cout << Colors::RESET() << "|" << std::endl; + json total_results; + json best_results; + // 3.3 Compile the results from all the workers + for (int i = 0; i < config_mpi.n_procs; ++i) { + json partial_results = json::parse(total + i * max_size); + for (auto& [dataset, folds] : partial_results.items()) { + for (auto& [fold, result] : folds.items()) { + total_results[dataset][fold] = result; + } + } + } + delete[] total; + // 3.4 Filter the best hyperparameters for each dataset + auto grid = GridData(Paths::grid_input(config.model)); + for (auto& [dataset, folds] : total_results.items()) { + double best_score = 0.0; + double duration = 0.0; + json best_hyper; + for (auto& [fold, result] : folds.items()) { + duration += result["duration"].get(); + if (result["score"] > best_score) { + best_score = result["score"]; + best_hyper = result["hyperparameters"]; + } + } + auto timer = Timer(); + json result = { + { "score", best_score }, + { "hyperparameters", best_hyper }, + { "date", get_date() + " " + get_time() }, + { "grid", grid.getInputGrid(dataset) }, + { "duration", timer.translate2String(duration) } + }; + best_results[dataset] = result; + } + save(best_results); + } + } void GridSearch::go() { timer.start(); @@ -271,39 +555,6 @@ namespace platform { } return { goatScore, goatHyperparameters }; } - vector GridSearch::processDatasets(Datasets& datasets) - { - // Load datasets - auto datasets_names = datasets.getNames(); - if (config.continue_from != NO_CONTINUE()) { - // Continue previous execution: - if (std::find(datasets_names.begin(), datasets_names.end(), config.continue_from) == datasets_names.end()) { - throw std::invalid_argument("Dataset " + config.continue_from + " not found"); - } - // Remove datasets already processed - vector< string >::iterator it = datasets_names.begin(); - while (it != datasets_names.end()) { - if (*it != config.continue_from) { - it = datasets_names.erase(it); - } else { - if (config.only) - ++it; - else - break; - } - } - } - // Exclude datasets - for (const auto& name : config.excluded) { - auto dataset = name.get(); - auto it = std::find(datasets_names.begin(), datasets_names.end(), dataset); - if (it == datasets_names.end()) { - throw std::invalid_argument("Dataset " + dataset + " already excluded or doesn't exist!"); - } - datasets_names.erase(it); - } - return datasets_names; - } json GridSearch::initializeResults() { // Load previous results diff --git a/src/Platform/GridSearch.h b/src/Platform/GridSearch.h index e325ca5..c00b2ee 100644 --- a/src/Platform/GridSearch.h +++ b/src/Platform/GridSearch.h @@ -2,6 +2,7 @@ #define GRIDSEARCH_H #include #include +#include #include #include "Datasets.h" #include "HyperParameters.h" @@ -24,10 +25,16 @@ namespace platform { json excluded; std::vector seeds; }; + struct ConfigMPI { + int rank; + int n_procs; + int manager; + }; class GridSearch { public: explicit GridSearch(struct ConfigGrid& config); void go(); + void go_mpi(struct ConfigMPI& config_mpi); ~GridSearch() = default; json getResults(); static inline std::string NO_CONTINUE() { return "NO_CONTINUE"; } @@ -38,6 +45,9 @@ namespace platform { pair processFileSingle(std::string fileName, Datasets& datasets, std::vector& combinations); pair processFileNested(std::string fileName, Datasets& datasets, std::vector& combinations); struct ConfigGrid config; + pair part_range_mpi(int n_tasks, int nprocs, int rank); + json build_tasks_mpi(); + void process_task_mpi(struct ConfigMPI& config_mpi, json& task, Datasets& datasets, json& results); Timer timer; // used to measure the time of the whole process }; } /* namespace platform */ diff --git a/src/Platform/Timer.h b/src/Platform/Timer.h index b44d629..dd10d94 100644 --- a/src/Platform/Timer.h +++ b/src/Platform/Timer.h @@ -28,10 +28,14 @@ namespace platform { std::string getDurationString(bool lapse = false) { double duration = lapse ? getLapse() : getDuration(); + return translate2String(duration); + } + std::string translate2String(double duration) + { double durationShow = duration > 3600 ? duration / 3600 : duration > 60 ? duration / 60 : duration; std::string durationUnit = duration > 3600 ? "h" : duration > 60 ? "m" : "s"; std::stringstream ss; - ss << std::setw(7) << std::setprecision(2) << std::fixed << durationShow << " " << durationUnit << " "; + ss << std::setprecision(2) << std::fixed << durationShow << " " << durationUnit; return ss.str(); } }; diff --git a/src/Platform/b_grid.cc b/src/Platform/b_grid.cc index a5af2a6..d870353 100644 --- a/src/Platform/b_grid.cc +++ b/src/Platform/b_grid.cc @@ -2,6 +2,7 @@ #include #include #include +#include #include "DotEnv.h" #include "Models.h" #include "modelRegister.h" @@ -31,6 +32,7 @@ void manageArguments(argparse::ArgumentParser& program) group.add_argument("--report").help("Report the computed hyperparameters").default_value(false).implicit_value(true); group.add_argument("--compute").help("Perform computation of the grid output hyperparameters").default_value(false).implicit_value(true); program.add_argument("--discretize").help("Discretize input datasets").default_value((bool)stoi(env.get("discretize"))).implicit_value(true); + program.add_argument("--mpi").help("Use MPI computing grid").default_value(false).implicit_value(true); program.add_argument("--stratified").help("If Stratified KFold is to be done").default_value((bool)stoi(env.get("stratified"))).implicit_value(true); program.add_argument("--quiet").help("Don't display detailed progress").default_value(false).implicit_value(true); program.add_argument("--continue").help("Continue computing from that dataset").default_value(platform::GridSearch::NO_CONTINUE()); @@ -131,14 +133,13 @@ void list_results(json& results, std::string& model) std::cout << color; std::cout << std::setw(3) << std::right << index++ << " "; std::cout << left << setw(spaces) << key << " " << value["date"].get() - << " " << setw(8) << value["duration"] << " " << setw(8) << setprecision(6) + << " " << setw(8) << right << value["duration"].get() << " " << setw(8) << setprecision(6) << fixed << right << value["score"].get() << " " << value["hyperparameters"].dump() << std::endl; odd = !odd; } std::cout << Colors::RESET() << std::endl; } - /* * Main */ @@ -170,6 +171,11 @@ int main(int argc, char** argv) } auto excluded = program.get("exclude"); config.excluded = json::parse(excluded); + if (program.get("mpi")) { + if (!compute || config.nested == 0) { + throw std::runtime_error("Cannot use --mpi without --compute or without --nested"); + } + } } catch (const exception& err) { cerr << err.what() << std::endl; @@ -189,8 +195,23 @@ int main(int argc, char** argv) list_dump(config.model); } else { if (compute) { - grid_search.go(); - std::cout << "Process took " << timer.getDurationString() << std::endl; + if (program.get("mpi")) { + struct platform::ConfigMPI mpi_config; + mpi_config.manager = 0; // which process is the manager + MPI_Init(&argc, &argv); + MPI_Comm_rank(MPI_COMM_WORLD, &mpi_config.rank); + MPI_Comm_size(MPI_COMM_WORLD, &mpi_config.n_procs); + grid_search.go_mpi(mpi_config); + if (mpi_config.rank == mpi_config.manager) { + auto results = grid_search.getResults(); + list_results(results, config.model); + std::cout << "Process took " << timer.getDurationString() << std::endl; + } + MPI_Finalize(); + } else { + grid_search.go(); + std::cout << "Process took " << timer.getDurationString() << std::endl; + } } else { // List results auto results = grid_search.getResults();