From e0b7b2d3168b6c7d312274b4c076cfbb45346738 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Monta=C3=B1ana=20G=C3=B3mez?= Date: Fri, 22 Dec 2023 12:47:13 +0100 Subject: [PATCH] Set structure & protocol of producer-consumer --- src/Platform/GridSearch.cc | 271 ++++++++++++++++++++++++++----------- src/Platform/GridSearch.h | 9 ++ 2 files changed, 202 insertions(+), 78 deletions(-) diff --git a/src/Platform/GridSearch.cc b/src/Platform/GridSearch.cc index 76c4b4c..f8fd2ee 100644 --- a/src/Platform/GridSearch.cc +++ b/src/Platform/GridSearch.cc @@ -149,88 +149,120 @@ namespace platform { auto colors = { Colors::RED(), Colors::GREEN(), Colors::BLUE(), Colors::MAGENTA(), Colors::CYAN() }; return *(colors.begin() + rank % colors.size()); } - void GridSearch::process_task_mpi(struct ConfigMPI& config_mpi, json& task, Datasets& datasets, json& results) + + void GridSearch::go_producer_consumer(struct ConfigMPI& config_mpi) { - // Process the task and store the result in the results json - Timer timer; - timer.start(); - auto grid = GridData(Paths::grid_input(config.model)); - auto dataset = task["dataset"].get(); - auto seed = task["seed"].get(); - auto n_fold = task["fold"].get(); - // Generate the hyperparamters combinations - auto combinations = grid.getGrid(dataset); - auto [X, y] = datasets.getTensors(dataset); - auto states = datasets.getStates(dataset); - auto features = datasets.getFeatures(dataset); - auto className = datasets.getClassName(dataset); + /* + * Each task is a json object with the following structure: + * { + * "dataset": "dataset_name", + * "seed": # of seed to use, + * "model": "model_name", + * "Fold": # of fold to process + * } + * + * The overall process consists in these steps: + * 0. Create the MPI result type & tasks + * 0.1 Create the MPI result type + * 0.2 Manager creates the tasks + * 1. Manager will broadcast the tasks to all the processes + * 1.1 Broadcast the number of tasks + * 1.2 Broadcast the length of the following string + * 1.2 Broadcast the tasks as a char* string + * 2. Workers will receive the tasks and start the process + * 2.1 A method will tell each worker the range of tasks to process + * 2.2 Each worker will process the tasks and generate the best score for each task + * 3. Manager gather the scores from all the workers and find out the best hyperparameters for each dataset + * 3.1 Obtain the maximum size of the results message of all the workers + * 3.2 Gather all the results from the workers into the manager + * 3.3 Compile the results from all the workers + * 3.4 Filter the best hyperparameters for each dataset + */ // - // Start working on task + // 0.1 Create the MPI result type // - Fold* fold; - if (config.stratified) - fold = new StratifiedKFold(config.n_folds, y, seed); - else - fold = new KFold(config.n_folds, y.size(0), seed); - auto [train, test] = fold->getFold(n_fold); - auto train_t = torch::tensor(train); - auto test_t = torch::tensor(test); - auto X_train = X.index({ "...", train_t }); - auto y_train = y.index({ train_t }); - auto X_test = X.index({ "...", test_t }); - auto y_test = y.index({ test_t }); - auto num = 0; - double best_fold_score = 0.0; - json best_fold_hyper; - for (const auto& hyperparam_line : combinations) { - auto hyperparameters = platform::HyperParameters(datasets.getNames(), hyperparam_line); - Fold* nested_fold; - if (config.stratified) - nested_fold = new StratifiedKFold(config.nested, y_train, seed); - else - nested_fold = new KFold(config.nested, y_train.size(0), seed); - double score = 0.0; - for (int n_nested_fold = 0; n_nested_fold < config.nested; n_nested_fold++) { - // Nested level fold - auto [train_nested, test_nested] = nested_fold->getFold(n_nested_fold); - auto train_nested_t = torch::tensor(train_nested); - auto test_nested_t = torch::tensor(test_nested); - auto X_nested_train = X_train.index({ "...", train_nested_t }); - auto y_nested_train = y_train.index({ train_nested_t }); - auto X_nested_test = X_train.index({ "...", test_nested_t }); - auto y_nested_test = y_train.index({ test_nested_t }); - // Build Classifier with selected hyperparameters - auto clf = Models::instance()->create(config.model); - auto valid = clf->getValidHyperparameters(); - hyperparameters.check(valid, dataset); - clf->setHyperparameters(hyperparameters.get(dataset)); - // Train model - clf->fit(X_nested_train, y_nested_train, features, className, states); - // Test model - score += clf->score(X_nested_test, y_nested_test); - } - delete nested_fold; - score /= config.nested; - if (score > best_fold_score) { - best_fold_score = score; - best_fold_hyper = hyperparam_line; - } + Task_Result result; + MPI_Datatype MPI_Result; + MPI_Datatype type[3] = { MPI_UNSIGNED, MPI_UNSIGNED, MPI_DOUBLE }; + int blocklen[3] = { 1, 1, 1 }; + MPI_Aint disp[3]; + disp[0] = offsetof(struct MPI_Result, idx_dataset); + disp[1] = offsetof(struct MPI_Result, idx_combination); + disp[2] = offsetof(struct MPI_Result, score); + MPI_Type_create_struct(3, blocklen, disp, type, &MPI_Result); + MPI_Type_commit(&MPI_Result); + // + // 0.2 Manager creates the tasks + // + char* msg; + if (config_mpi.rank == config_mpi.manager) { + timer.start(); + auto tasks = build_tasks_mpi(); + auto tasks_str = tasks.dump(); + tasks_size = tasks_str.size(); + msg = new char[tasks_size + 1]; + strcpy(msg, tasks_str.c_str()); + } + // + // 1. Manager will broadcast the tasks to all the processes + // + MPI_Bcast(&tasks_size, 1, MPI_INT, config_mpi.manager, MPI_COMM_WORLD); + if (config_mpi.rank != config_mpi.manager) { + msg = new char[tasks_size + 1]; + } + MPI_Bcast(msg, tasks_size + 1, MPI_CHAR, config_mpi.manager, MPI_COMM_WORLD); + json tasks = json::parse(msg); + delete[] msg; + // + // 2. All Workers will receive the tasks and start the process + // + if (config_mpi.rank == config_mpi.manager) { + producer(tasks, &MPI_Result); + } else { + consumer(tasks, &MPI_Result); + } + } + void producer(json& tasks, MPI_Datatpe& MPI_Result) + { + Task_Result result; + int num_tasks = tasks.size(); + for (int i = 0; i < num_tasks; ++i) { + MPI_Status status; + MPI_recv(&result, 1, MPI_Result, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); + if (status.MPI_TAG == TAG_RESULT) { + //Store result + + } + MPI_Send(&i, 1, MPI_INT, status.MPI_SOURCE, TAG_TASK, MPI_COMM_WORLD); + } + // Send end message to all workers + for (int i = 0; i < config_mpi.n_procs; ++i) { + MPI_Status status; + MPI_recv(&result, 1, MPI_Result, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status); + if (status.MPI_TAG == TAG_RESULT) { + //Store result + + } + MPI_Send(&i, 1, MPI_INT, status.MPI_SOURCE, TAG_END, MPI_COMM_WORLD); + } + } + void consumer(json& tasks, MPI_Datatpe& MPI_Result) + { + Task_Result result; + // Anounce to the producer + MPI_Send(&result, 1, MPI_Result, config_mpi.manager, TAG_QUERY, MPI_COMM_WORLD); + int task; + while (true) { + MPI_Status status; + MPI_recv(&task, 1, MPI_INT, config_mpi.manager, MPI_ANY_TAG, MPI_COMM_WORLD, &status); + if (status.MPI_TAG == TAG_END) { + break; + } + // Process task + process_task_mpi(config_mpi, task, datasets, results); + // Send result to producer + MPI_Send(&result, 1, MPI_Result, config_mpi.manager, TAG_RESULT, MPI_COMM_WORLD); } - delete fold; - // Build Classifier with the best hyperparameters to obtain the best score - auto hyperparameters = platform::HyperParameters(datasets.getNames(), best_fold_hyper); - auto clf = Models::instance()->create(config.model); - auto valid = clf->getValidHyperparameters(); - hyperparameters.check(valid, dataset); - clf->setHyperparameters(best_fold_hyper); - clf->fit(X_train, y_train, features, className, states); - best_fold_score = clf->score(X_test, y_test); - // Save results - results[dataset][std::to_string(n_fold)]["score"] = best_fold_score; - results[dataset][std::to_string(n_fold)]["hyperparameters"] = best_fold_hyper; - results[dataset][std::to_string(n_fold)]["seed"] = seed; - results[dataset][std::to_string(n_fold)]["duration"] = timer.getDuration(); - std::cout << get_color_rank(config_mpi.rank) << "*" << std::flush; } void GridSearch::go_mpi(struct ConfigMPI& config_mpi) { @@ -555,6 +587,89 @@ namespace platform { } return { goatScore, goatHyperparameters }; } + void GridSearch::process_task_mpi(struct ConfigMPI& config_mpi, json& task, Datasets& datasets, json& results) + { + // Process the task and store the result in the results json + Timer timer; + timer.start(); + auto grid = GridData(Paths::grid_input(config.model)); + auto dataset = task["dataset"].get(); + auto seed = task["seed"].get(); + auto n_fold = task["fold"].get(); + // Generate the hyperparamters combinations + auto combinations = grid.getGrid(dataset); + auto [X, y] = datasets.getTensors(dataset); + auto states = datasets.getStates(dataset); + auto features = datasets.getFeatures(dataset); + auto className = datasets.getClassName(dataset); + // + // Start working on task + // + Fold* fold; + if (config.stratified) + fold = new StratifiedKFold(config.n_folds, y, seed); + else + fold = new KFold(config.n_folds, y.size(0), seed); + auto [train, test] = fold->getFold(n_fold); + auto train_t = torch::tensor(train); + auto test_t = torch::tensor(test); + auto X_train = X.index({ "...", train_t }); + auto y_train = y.index({ train_t }); + auto X_test = X.index({ "...", test_t }); + auto y_test = y.index({ test_t }); + auto num = 0; + double best_fold_score = 0.0; + json best_fold_hyper; + for (const auto& hyperparam_line : combinations) { + auto hyperparameters = platform::HyperParameters(datasets.getNames(), hyperparam_line); + Fold* nested_fold; + if (config.stratified) + nested_fold = new StratifiedKFold(config.nested, y_train, seed); + else + nested_fold = new KFold(config.nested, y_train.size(0), seed); + double score = 0.0; + for (int n_nested_fold = 0; n_nested_fold < config.nested; n_nested_fold++) { + // Nested level fold + auto [train_nested, test_nested] = nested_fold->getFold(n_nested_fold); + auto train_nested_t = torch::tensor(train_nested); + auto test_nested_t = torch::tensor(test_nested); + auto X_nested_train = X_train.index({ "...", train_nested_t }); + auto y_nested_train = y_train.index({ train_nested_t }); + auto X_nested_test = X_train.index({ "...", test_nested_t }); + auto y_nested_test = y_train.index({ test_nested_t }); + // Build Classifier with selected hyperparameters + auto clf = Models::instance()->create(config.model); + auto valid = clf->getValidHyperparameters(); + hyperparameters.check(valid, dataset); + clf->setHyperparameters(hyperparameters.get(dataset)); + // Train model + clf->fit(X_nested_train, y_nested_train, features, className, states); + // Test model + score += clf->score(X_nested_test, y_nested_test); + } + delete nested_fold; + score /= config.nested; + if (score > best_fold_score) { + best_fold_score = score; + best_fold_hyper = hyperparam_line; + } + } + delete fold; + // Build Classifier with the best hyperparameters to obtain the best score + auto hyperparameters = platform::HyperParameters(datasets.getNames(), best_fold_hyper); + auto clf = Models::instance()->create(config.model); + auto valid = clf->getValidHyperparameters(); + hyperparameters.check(valid, dataset); + clf->setHyperparameters(best_fold_hyper); + clf->fit(X_train, y_train, features, className, states); + best_fold_score = clf->score(X_test, y_test); + // Save results + results[dataset][std::to_string(n_fold)]["score"] = best_fold_score; + results[dataset][std::to_string(n_fold)]["hyperparameters"] = best_fold_hyper; + results[dataset][std::to_string(n_fold)]["seed"] = seed; + results[dataset][std::to_string(n_fold)]["duration"] = timer.getDuration(); + std::cout << get_color_rank(config_mpi.rank) << "*" << std::flush; + } json GridSearch::initializeResults() { // Load previous results diff --git a/src/Platform/GridSearch.h b/src/Platform/GridSearch.h index c00b2ee..1a868a8 100644 --- a/src/Platform/GridSearch.h +++ b/src/Platform/GridSearch.h @@ -30,6 +30,15 @@ namespace platform { int n_procs; int manager; }; + typedef struct { + uint idx_dataset; + uint idx_combination; + double score; + } Task_Result; + const TAG_QUERY = 1; + const TAG_RESULT = 2; + const TAG_TASK = 3; + const TAG_END = 4; class GridSearch { public: explicit GridSearch(struct ConfigGrid& config);