diff --git a/bayesnet/ensembles/Ensemble.cc b/bayesnet/ensembles/Ensemble.cc index 68f3ee5..4b71a16 100644 --- a/bayesnet/ensembles/Ensemble.cc +++ b/bayesnet/ensembles/Ensemble.cc @@ -3,14 +3,13 @@ // SPDX-FileType: SOURCE // SPDX-License-Identifier: MIT // *************************************************************** - #include "Ensemble.h" +#include "bayesnet/utils/CountingSemaphore.h" namespace bayesnet { Ensemble::Ensemble(bool predict_voting) : Classifier(Network()), n_models(0), predict_voting(predict_voting) { - }; const std::string ENSEMBLE_NOT_FITTED = "Ensemble has not been fitted"; void Ensemble::trainModel(const torch::Tensor& weights, const Smoothing_t smoothing) @@ -85,17 +84,9 @@ namespace bayesnet { { auto n_states = models[0]->getClassNumStates(); torch::Tensor y_pred = torch::zeros({ X.size(1), n_states }, torch::kFloat32); - auto threads{ std::vector() }; - std::mutex mtx; for (auto i = 0; i < n_models; ++i) { - threads.push_back(std::thread([&, i]() { - auto ypredict = models[i]->predict_proba(X); - std::lock_guard lock(mtx); - y_pred += ypredict * significanceModels[i]; - })); - } - for (auto& thread : threads) { - thread.join(); + auto ypredict = models[i]->predict_proba(X); + y_pred += ypredict * significanceModels[i]; } auto sum = std::reduce(significanceModels.begin(), significanceModels.end()); y_pred /= sum; @@ -105,23 +96,15 @@ namespace bayesnet { { auto n_states = models[0]->getClassNumStates(); std::vector> y_pred(X[0].size(), std::vector(n_states, 0.0)); - auto threads{ std::vector() }; - std::mutex mtx; for (auto i = 0; i < n_models; ++i) { - threads.push_back(std::thread([&, i]() { - auto ypredict = models[i]->predict_proba(X); - assert(ypredict.size() == y_pred.size()); - assert(ypredict[0].size() == y_pred[0].size()); - std::lock_guard lock(mtx); - // Multiply each prediction by the significance of the model and then add it to the final prediction - for (auto j = 0; j < ypredict.size(); ++j) { - std::transform(y_pred[j].begin(), y_pred[j].end(), ypredict[j].begin(), y_pred[j].begin(), - [significanceModels = significanceModels[i]](double x, double y) { return x + y * significanceModels; }); - } - })); - } - for (auto& thread : threads) { - thread.join(); + auto ypredict = models[i]->predict_proba(X); + assert(ypredict.size() == y_pred.size()); + assert(ypredict[0].size() == y_pred[0].size()); + // Multiply each prediction by the significance of the model and then add it to the final prediction + for (auto j = 0; j < ypredict.size(); ++j) { + std::transform(y_pred[j].begin(), y_pred[j].end(), ypredict[j].begin(), y_pred[j].begin(), + [significanceModels = significanceModels[i]](double x, double y) { return x + y * significanceModels; }); + } } auto sum = std::reduce(significanceModels.begin(), significanceModels.end()); //Divide each element of the prediction by the sum of the significances @@ -141,17 +124,9 @@ namespace bayesnet { { // Build a m x n_models tensor with the predictions of each model torch::Tensor y_pred = torch::zeros({ X.size(1), n_models }, torch::kInt32); - auto threads{ std::vector() }; - std::mutex mtx; for (auto i = 0; i < n_models; ++i) { - threads.push_back(std::thread([&, i]() { - auto ypredict = models[i]->predict(X); - std::lock_guard lock(mtx); - y_pred.index_put_({ "...", i }, ypredict); - })); - } - for (auto& thread : threads) { - thread.join(); + auto ypredict = models[i]->predict(X); + y_pred.index_put_({ "...", i }, ypredict); } return voting(y_pred); } diff --git a/bayesnet/network/CountingSemaphore.h b/bayesnet/network/CountingSemaphore.h deleted file mode 100644 index 6f65e71..0000000 --- a/bayesnet/network/CountingSemaphore.h +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef COUNTING_SEMAPHORE_H -#define COUNTING_SEMAPHORE_H -#include -#include -class CountingSemaphore { -public: - explicit CountingSemaphore(size_t max_count) : max_count_(max_count), count_(max_count) {} - - // Acquires a permit, blocking if necessary until one becomes available - void acquire() - { - std::unique_lock lock(mtx_); - cv_.wait(lock, [this]() { return count_ > 0; }); - --count_; - } - - // Releases a permit, potentially waking up a blocked acquirer - void release() - { - std::lock_guard lock(mtx_); - ++count_; - if (count_ <= max_count_) { - cv_.notify_one(); - } - } - -private: - std::mutex mtx_; - std::condition_variable cv_; - size_t max_count_; - size_t count_; -}; -#endif \ No newline at end of file diff --git a/bayesnet/network/Network.cc b/bayesnet/network/Network.cc index aa9eb16..8aee16d 100644 --- a/bayesnet/network/Network.cc +++ b/bayesnet/network/Network.cc @@ -8,22 +8,16 @@ #include #include #include -#include "CountingSemaphore.h" #include "Network.h" #include "bayesnet/utils/bayesnetUtils.h" +#include "bayesnet/utils/CountingSemaphore.h" +#include namespace bayesnet { - Network::Network() : fitted{ false }, maxThreads{ 0.95 }, classNumStates{ 0 } + Network::Network() : fitted{ false }, classNumStates{ 0 } { - maxThreadsRunning = std::max(1, static_cast(std::thread::hardware_concurrency() * maxThreads)); - maxThreadsRunning = std::min(maxThreadsRunning, static_cast(std::thread::hardware_concurrency())); - } - Network::Network(float maxT) : fitted{ false }, maxThreads{ maxT }, classNumStates{ 0 } - { - maxThreadsRunning = std::max(1, static_cast(std::thread::hardware_concurrency() * maxThreads)); - maxThreadsRunning = std::min(maxThreadsRunning, static_cast(std::thread::hardware_concurrency())); } Network::Network(const Network& other) : features(other.features), className(other.className), classNumStates(other.getClassNumStates()), - maxThreads(other.getMaxThreads()), fitted(other.fitted), samples(other.samples), maxThreadsRunning(other.maxThreadsRunning) + fitted(other.fitted), samples(other.samples) { if (samples.defined()) samples = samples.clone(); @@ -40,10 +34,6 @@ namespace bayesnet { nodes.clear(); samples = torch::Tensor(); } - float Network::getMaxThreads() const - { - return maxThreads; - } torch::Tensor& Network::getSamples() { return samples; @@ -196,9 +186,11 @@ namespace bayesnet { { setStates(states); std::vector threads; - CountingSemaphore semaphore(maxThreadsRunning); + auto& semaphore = CountingSemaphore::getInstance(); const double n_samples = static_cast(samples.size(1)); - auto worker = [&](std::pair>& node) { + auto worker = [&](std::pair>& node, int i) { + std::string threadName = "FitWorker-" + std::to_string(i); + pthread_setname_np(pthread_self(), threadName.c_str()); semaphore.acquire(); double numStates = static_cast(node.second->getNumStates()); double smoothing_factor = 0.0; @@ -218,8 +210,9 @@ namespace bayesnet { node.second->computeCPT(samples, features, smoothing_factor, weights); semaphore.release(); }; + int i = 0; for (auto& node : nodes) { - threads.emplace_back(worker, std::ref(node)); + threads.emplace_back(worker, std::ref(node), i++); } for (auto& thread : threads) { thread.join(); @@ -345,12 +338,21 @@ namespace bayesnet { } std::vector Network::exactInference(std::map& evidence) { + + + //Implementar una cache para acelerar la inferencia. + // Cambiar la estrategia de crear hilos en la inferencia (por nodos como en fit?) + + + std::vector result(classNumStates, 0.0); std::vector threads; std::mutex mtx; - CountingSemaphore semaphore(maxThreadsRunning); + auto& semaphore = CountingSemaphore::getInstance(); auto worker = [&](int i) { semaphore.acquire(); + std::string threadName = "InferenceWorker-" + std::to_string(i); + pthread_setname_np(pthread_self(), threadName.c_str()); auto completeEvidence = std::map(evidence); completeEvidence[getClassName()] = i; double factor = computeFactor(completeEvidence); diff --git a/bayesnet/network/Network.h b/bayesnet/network/Network.h index a14540d..1aea190 100644 --- a/bayesnet/network/Network.h +++ b/bayesnet/network/Network.h @@ -56,8 +56,6 @@ namespace bayesnet { private: std::map> nodes; bool fitted; - float maxThreads = 0.95; // Coefficient to multiply by the number of threads available - int maxThreadsRunning; // Effective max number of threads running int classNumStates; std::vector features; // Including classname std::string className; diff --git a/bayesnet/utils/CountingSemaphore.h b/bayesnet/utils/CountingSemaphore.h new file mode 100644 index 0000000..25d1ac7 --- /dev/null +++ b/bayesnet/utils/CountingSemaphore.h @@ -0,0 +1,46 @@ +#ifndef COUNTING_SEMAPHORE_H +#define COUNTING_SEMAPHORE_H +#include +#include +#include +#include +#include +#include +#include + +class CountingSemaphore { +public: + static CountingSemaphore& getInstance() + { + static CountingSemaphore instance; + return instance; + } + // Delete copy constructor and assignment operator + CountingSemaphore(const CountingSemaphore&) = delete; + CountingSemaphore& operator=(const CountingSemaphore&) = delete; + void acquire() + { + std::unique_lock lock(mtx_); + cv_.wait(lock, [this]() { return count_ > 0; }); + --count_; + } + void release() + { + std::lock_guard lock(mtx_); + ++count_; + if (count_ <= max_count_) { + cv_.notify_one(); + } + } +private: + CountingSemaphore() + : max_count_(std::max(1u, static_cast(0.95 * std::thread::hardware_concurrency()))), + count_(max_count_) + { + } + std::mutex mtx_; + std::condition_variable cv_; + const uint max_count_; + uint count_; +}; +#endif \ No newline at end of file