diff --git a/src/Platform/CMakeLists.txt b/src/Platform/CMakeLists.txt index d35989f..e475b16 100644 --- a/src/Platform/CMakeLists.txt +++ b/src/Platform/CMakeLists.txt @@ -7,6 +7,7 @@ include_directories(${BayesNet_SOURCE_DIR}/lib/argparse/include) include_directories(${BayesNet_SOURCE_DIR}/lib/json/include) include_directories(${BayesNet_SOURCE_DIR}/lib/libxlsxwriter/include) include_directories(${Python3_INCLUDE_DIRS}) +include_directories(${MPI_CXX_INCLUDE_DIRS}) add_executable(b_best b_best.cc BestResults.cc Result.cc Statistics.cc BestResultsExcel.cc ReportExcel.cc ReportBase.cc Datasets.cc Dataset.cc ExcelFile.cc) add_executable(b_grid b_grid.cc GridSearch.cc GridData.cc HyperParameters.cc Folding.cc Datasets.cc Dataset.cc) @@ -15,7 +16,7 @@ add_executable(b_main b_main.cc Folding.cc Experiment.cc Datasets.cc Dataset.cc add_executable(b_manage b_manage.cc Results.cc ManageResults.cc CommandParser.cc Result.cc ReportConsole.cc ReportExcel.cc ReportBase.cc Datasets.cc Dataset.cc ExcelFile.cc) target_link_libraries(b_best Boost::boost "${XLSXWRITER_LIB}" "${TORCH_LIBRARIES}" ArffFiles mdlp) -target_link_libraries(b_grid BayesNet PyWrap) +target_link_libraries(b_grid BayesNet PyWrap ${MPI_CXX_LIBRARIES}) target_link_libraries(b_list ArffFiles mdlp "${TORCH_LIBRARIES}") target_link_libraries(b_main BayesNet ArffFiles mdlp "${TORCH_LIBRARIES}" PyWrap) target_link_libraries(b_manage "${TORCH_LIBRARIES}" "${XLSXWRITER_LIB}" ArffFiles mdlp) \ No newline at end of file diff --git a/src/Platform/GridSearch.cc b/src/Platform/GridSearch.cc index 9f91c6a..7b629ba 100644 --- a/src/Platform/GridSearch.cc +++ b/src/Platform/GridSearch.cc @@ -63,6 +63,126 @@ namespace platform { return Colors::RESET(); } } + json GridSearch::buildTasks() + { + auto result = json::array(); + auto datasets = Datasets(false, Paths::datasets()); + auto datasets_names = datasets.getNames(); + auto grid = GridData(Paths::grid_input(config.model)); + for (const auto& dataset : datasets_names) { + for (const auto& seed : config.seeds) { + auto combinations = grid.getGrid(dataset); + for (const auto& hyperparam_line : combinations) { + auto hyperparameters = platform::HyperParameters(datasets.getNames(), hyperparam_line); + json task = { + { "dataset", dataset }, + { "seed", seed }, + { "hyperparameters", hyperparameters.get(dataset) } + }; + result.push_back(task); + } + } + } + return result; + } + std::pair GridSearch::partRange(int n_tasks, int nprocs, int rank) + { + int assigned = 0; + int remainder = n_tasks % nprocs; + int start = 0; + if (rank < remainder) { + assigned = n_tasks / nprocs + 1; + } else { + assigned = n_tasks / nprocs; + start = remainder; + } + start += rank * assigned; + int end = start + assigned; + if (rank == nprocs - 1) { + end = n_tasks; + } + return { start, end }; + } + void GridSearch::go_MPI(struct ConfigMPI& config_mpi) + { + /* + * Manager will do the loops dataset, seed, fold (primary) and hyperparameter + * Workers will do the loop fold (nested) + * + * The overall process consists in these steps: + * 1. Manager will broadcast the tasks to all the processes + * 1.1 Broadcast the number of tasks + * 1.2 Broadcast the length of the following string + * 1.2 Broadcast the tasks as a char* string + * 2. Workers will receive the tasks and start the process + * 2.1 A method will tell each worker the range of combinations to process + * 2.2 Each worker will process the combinations and return the best score obtained + * 3. Manager gather the scores from all the workers and get the best hyperparameters + * 3.1 Manager find out which worker has the best score + * 3.2 Manager broadcast the winner worker + * 3.3 The winner worker send the best hyperparameters to manager + * + */ + char* msg; + int tasks_size; + if (config_mpi.rank == config_mpi.manager) { + auto tasks = buildTasks(); + auto tasks_str = tasks.dump(); + tasks_size = tasks_str.size(); + msg = new char[tasks_size + 1]; + strcpy(msg, tasks_str.c_str()); + } + // + // 1. Manager will broadcast the tasks to all the processes + // + MPI_Bcast(&tasks_size, 1, MPI_INT, config_mpi.manager, MPI_COMM_WORLD); + if (config_mpi.rank != config_mpi.manager) { + msg = new char[tasks_size + 1]; + } + MPI_Bcast(msg, tasks_size + 1, MPI_CHAR, config_mpi.manager, MPI_COMM_WORLD); + json tasks = json::parse(msg); + delete[] msg; + // + // 2. All Workers will receive the tasks and start the process + // + int num_tasks = tasks.size(); + auto [start, end] = partRange(num_tasks, config_mpi.n_procs, config_mpi.rank); + // 2.2 Each worker will process the combinations and return the best score obtained + for (int i = start; i < end; ++i) { + auto task = tasks[i]; + auto dataset = task["dataset"].get(); + auto seed = task["seed"].get(); + auto hyperparam_line = task["hyperparameters"]; + auto datasets = Datasets(config.discretize, Paths::datasets()); + auto [X, y] = datasets.getTensors(dataset); + auto states = datasets.getStates(dataset); + auto features = datasets.getFeatures(dataset); + auto className = datasets.getClassName(dataset); + double bestScore = 0.0; + json bestHyperparameters; + // First level fold + Fold* fold; + if (config.stratified) + fold = new StratifiedKFold(config.n_folds, y, seed); + else + fold = new KFold(config.n_folds, y.size(0), seed); + for (int nfold = 0; nfold < config.n_folds; nfold++) { + + auto clf = Models::instance()->create(config.model); + auto valid = clf->getValidHyperparameters(); + hyperparameters.check(valid, dataset); + clf->setHyperparameters(hyperparameters.get(dataset)); + auto [train, test] = fold->getFold(nfold); + auto train_t = torch::tensor(train); + auto test_t = torch::tensor(test); + auto X_train = X.index({ "...", train_t }); + auto y_train = y.index({ train_t }); + auto X_test = X.index({ "...", test + } + + } + } + } void GridSearch::go() { timer.start(); diff --git a/src/Platform/GridSearch.h b/src/Platform/GridSearch.h index 70bbf47..330696d 100644 --- a/src/Platform/GridSearch.h +++ b/src/Platform/GridSearch.h @@ -26,12 +26,14 @@ namespace platform { }; struct ConfigMPI { int rank; - int nprocs; - } + int n_procs; + int manager; + }; class GridSearch { public: explicit GridSearch(struct ConfigGrid& config); void go(); + void go_MPI(struct ConfigMPI& config_mpi); ~GridSearch() = default; json getResults(); static inline std::string NO_CONTINUE() { return "NO_CONTINUE"; } @@ -42,6 +44,8 @@ namespace platform { pair processFileSingle(std::string fileName, Datasets& datasets, std::vector& combinations); pair processFileNested(std::string fileName, Datasets& datasets, std::vector& combinations); struct ConfigGrid config; + pair partRange(int n_tasks, int nprocs, int rank); + json buildTasks(); Timer timer; // used to measure the time of the whole process }; } /* namespace platform */ diff --git a/src/Platform/b_grid.cc b/src/Platform/b_grid.cc index 069f8a2..dde5d14 100644 --- a/src/Platform/b_grid.cc +++ b/src/Platform/b_grid.cc @@ -2,6 +2,7 @@ #include #include #include +#include #include "DotEnv.h" #include "Models.h" #include "modelRegister.h" @@ -164,7 +165,6 @@ int main(int argc, char** argv) argparse::ArgumentParser program("b_grid"); manageArguments(program); struct platform::ConfigGrid config; - struct platform::ConfigMPI mpi_config; bool dump, compute; try { program.parse_args(argc, argv); @@ -213,11 +213,13 @@ int main(int argc, char** argv) } else { if (compute) { if (program.get("mpi")) { - MPI_Init(nullptr, nullptr); - MPI_Comm_rank(MPI_COMM_WORLD, &config.rank); - MPI_Comm_size(MPI_COMM_WORLD, &config.size); - grid_search.go_mpi(); - MPI_Finzalize(); + struct platform::ConfigMPI mpi_config; + mpi_config.manager = 0; // which process is the manager + MPI_Init(&argc, &argv); + MPI_Comm_rank(MPI_COMM_WORLD, &mpi_config.rank); + MPI_Comm_size(MPI_COMM_WORLD, &mpi_config.n_procs); + grid_search.go_MPI(mpi_config); + MPI_Finalize(); } else { grid_search.go(); std::cout << "Process took " << timer.getDurationString() << std::endl;