Refactor CountingSemaphore as singleton

This commit is contained in:
Ricardo Montañana Gómez 2024-06-21 09:30:24 +02:00
parent 716748e18c
commit 02bcab01be
Signed by: rmontanana
GPG Key ID: 46064262FD9A7ADE
5 changed files with 79 additions and 91 deletions

View File

@ -3,14 +3,13 @@
// SPDX-FileType: SOURCE // SPDX-FileType: SOURCE
// SPDX-License-Identifier: MIT // SPDX-License-Identifier: MIT
// *************************************************************** // ***************************************************************
#include "Ensemble.h" #include "Ensemble.h"
#include "bayesnet/utils/CountingSemaphore.h"
namespace bayesnet { namespace bayesnet {
Ensemble::Ensemble(bool predict_voting) : Classifier(Network()), n_models(0), predict_voting(predict_voting) Ensemble::Ensemble(bool predict_voting) : Classifier(Network()), n_models(0), predict_voting(predict_voting)
{ {
}; };
const std::string ENSEMBLE_NOT_FITTED = "Ensemble has not been fitted"; const std::string ENSEMBLE_NOT_FITTED = "Ensemble has not been fitted";
void Ensemble::trainModel(const torch::Tensor& weights, const Smoothing_t smoothing) void Ensemble::trainModel(const torch::Tensor& weights, const Smoothing_t smoothing)
@ -85,17 +84,9 @@ namespace bayesnet {
{ {
auto n_states = models[0]->getClassNumStates(); auto n_states = models[0]->getClassNumStates();
torch::Tensor y_pred = torch::zeros({ X.size(1), n_states }, torch::kFloat32); torch::Tensor y_pred = torch::zeros({ X.size(1), n_states }, torch::kFloat32);
auto threads{ std::vector<std::thread>() };
std::mutex mtx;
for (auto i = 0; i < n_models; ++i) { for (auto i = 0; i < n_models; ++i) {
threads.push_back(std::thread([&, i]() { auto ypredict = models[i]->predict_proba(X);
auto ypredict = models[i]->predict_proba(X); y_pred += ypredict * significanceModels[i];
std::lock_guard<std::mutex> lock(mtx);
y_pred += ypredict * significanceModels[i];
}));
}
for (auto& thread : threads) {
thread.join();
} }
auto sum = std::reduce(significanceModels.begin(), significanceModels.end()); auto sum = std::reduce(significanceModels.begin(), significanceModels.end());
y_pred /= sum; y_pred /= sum;
@ -105,23 +96,15 @@ namespace bayesnet {
{ {
auto n_states = models[0]->getClassNumStates(); auto n_states = models[0]->getClassNumStates();
std::vector<std::vector<double>> y_pred(X[0].size(), std::vector<double>(n_states, 0.0)); std::vector<std::vector<double>> y_pred(X[0].size(), std::vector<double>(n_states, 0.0));
auto threads{ std::vector<std::thread>() };
std::mutex mtx;
for (auto i = 0; i < n_models; ++i) { for (auto i = 0; i < n_models; ++i) {
threads.push_back(std::thread([&, i]() { auto ypredict = models[i]->predict_proba(X);
auto ypredict = models[i]->predict_proba(X); assert(ypredict.size() == y_pred.size());
assert(ypredict.size() == y_pred.size()); assert(ypredict[0].size() == y_pred[0].size());
assert(ypredict[0].size() == y_pred[0].size()); // Multiply each prediction by the significance of the model and then add it to the final prediction
std::lock_guard<std::mutex> lock(mtx); for (auto j = 0; j < ypredict.size(); ++j) {
// Multiply each prediction by the significance of the model and then add it to the final prediction std::transform(y_pred[j].begin(), y_pred[j].end(), ypredict[j].begin(), y_pred[j].begin(),
for (auto j = 0; j < ypredict.size(); ++j) { [significanceModels = significanceModels[i]](double x, double y) { return x + y * significanceModels; });
std::transform(y_pred[j].begin(), y_pred[j].end(), ypredict[j].begin(), y_pred[j].begin(), }
[significanceModels = significanceModels[i]](double x, double y) { return x + y * significanceModels; });
}
}));
}
for (auto& thread : threads) {
thread.join();
} }
auto sum = std::reduce(significanceModels.begin(), significanceModels.end()); auto sum = std::reduce(significanceModels.begin(), significanceModels.end());
//Divide each element of the prediction by the sum of the significances //Divide each element of the prediction by the sum of the significances
@ -141,17 +124,9 @@ namespace bayesnet {
{ {
// Build a m x n_models tensor with the predictions of each model // Build a m x n_models tensor with the predictions of each model
torch::Tensor y_pred = torch::zeros({ X.size(1), n_models }, torch::kInt32); torch::Tensor y_pred = torch::zeros({ X.size(1), n_models }, torch::kInt32);
auto threads{ std::vector<std::thread>() };
std::mutex mtx;
for (auto i = 0; i < n_models; ++i) { for (auto i = 0; i < n_models; ++i) {
threads.push_back(std::thread([&, i]() { auto ypredict = models[i]->predict(X);
auto ypredict = models[i]->predict(X); y_pred.index_put_({ "...", i }, ypredict);
std::lock_guard<std::mutex> lock(mtx);
y_pred.index_put_({ "...", i }, ypredict);
}));
}
for (auto& thread : threads) {
thread.join();
} }
return voting(y_pred); return voting(y_pred);
} }

View File

@ -1,33 +0,0 @@
#ifndef COUNTING_SEMAPHORE_H
#define COUNTING_SEMAPHORE_H
#include <mutex>
#include <condition_variable>
class CountingSemaphore {
public:
explicit CountingSemaphore(size_t max_count) : max_count_(max_count), count_(max_count) {}
// Acquires a permit, blocking if necessary until one becomes available
void acquire()
{
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this]() { return count_ > 0; });
--count_;
}
// Releases a permit, potentially waking up a blocked acquirer
void release()
{
std::lock_guard<std::mutex> lock(mtx_);
++count_;
if (count_ <= max_count_) {
cv_.notify_one();
}
}
private:
std::mutex mtx_;
std::condition_variable cv_;
size_t max_count_;
size_t count_;
};
#endif

View File

@ -8,22 +8,16 @@
#include <sstream> #include <sstream>
#include <numeric> #include <numeric>
#include <algorithm> #include <algorithm>
#include "CountingSemaphore.h"
#include "Network.h" #include "Network.h"
#include "bayesnet/utils/bayesnetUtils.h" #include "bayesnet/utils/bayesnetUtils.h"
#include "bayesnet/utils/CountingSemaphore.h"
#include <pthread.h>
namespace bayesnet { namespace bayesnet {
Network::Network() : fitted{ false }, maxThreads{ 0.95 }, classNumStates{ 0 } Network::Network() : fitted{ false }, classNumStates{ 0 }
{ {
maxThreadsRunning = std::max(1, static_cast<int>(std::thread::hardware_concurrency() * maxThreads));
maxThreadsRunning = std::min(maxThreadsRunning, static_cast<int>(std::thread::hardware_concurrency()));
}
Network::Network(float maxT) : fitted{ false }, maxThreads{ maxT }, classNumStates{ 0 }
{
maxThreadsRunning = std::max(1, static_cast<int>(std::thread::hardware_concurrency() * maxThreads));
maxThreadsRunning = std::min(maxThreadsRunning, static_cast<int>(std::thread::hardware_concurrency()));
} }
Network::Network(const Network& other) : features(other.features), className(other.className), classNumStates(other.getClassNumStates()), Network::Network(const Network& other) : features(other.features), className(other.className), classNumStates(other.getClassNumStates()),
maxThreads(other.getMaxThreads()), fitted(other.fitted), samples(other.samples), maxThreadsRunning(other.maxThreadsRunning) fitted(other.fitted), samples(other.samples)
{ {
if (samples.defined()) if (samples.defined())
samples = samples.clone(); samples = samples.clone();
@ -40,10 +34,6 @@ namespace bayesnet {
nodes.clear(); nodes.clear();
samples = torch::Tensor(); samples = torch::Tensor();
} }
float Network::getMaxThreads() const
{
return maxThreads;
}
torch::Tensor& Network::getSamples() torch::Tensor& Network::getSamples()
{ {
return samples; return samples;
@ -196,9 +186,11 @@ namespace bayesnet {
{ {
setStates(states); setStates(states);
std::vector<std::thread> threads; std::vector<std::thread> threads;
CountingSemaphore semaphore(maxThreadsRunning); auto& semaphore = CountingSemaphore::getInstance();
const double n_samples = static_cast<double>(samples.size(1)); const double n_samples = static_cast<double>(samples.size(1));
auto worker = [&](std::pair<const std::string, std::unique_ptr<Node>>& node) { auto worker = [&](std::pair<const std::string, std::unique_ptr<Node>>& node, int i) {
std::string threadName = "FitWorker-" + std::to_string(i);
pthread_setname_np(pthread_self(), threadName.c_str());
semaphore.acquire(); semaphore.acquire();
double numStates = static_cast<double>(node.second->getNumStates()); double numStates = static_cast<double>(node.second->getNumStates());
double smoothing_factor = 0.0; double smoothing_factor = 0.0;
@ -218,8 +210,9 @@ namespace bayesnet {
node.second->computeCPT(samples, features, smoothing_factor, weights); node.second->computeCPT(samples, features, smoothing_factor, weights);
semaphore.release(); semaphore.release();
}; };
int i = 0;
for (auto& node : nodes) { for (auto& node : nodes) {
threads.emplace_back(worker, std::ref(node)); threads.emplace_back(worker, std::ref(node), i++);
} }
for (auto& thread : threads) { for (auto& thread : threads) {
thread.join(); thread.join();
@ -345,12 +338,21 @@ namespace bayesnet {
} }
std::vector<double> Network::exactInference(std::map<std::string, int>& evidence) std::vector<double> Network::exactInference(std::map<std::string, int>& evidence)
{ {
//Implementar una cache para acelerar la inferencia.
// Cambiar la estrategia de crear hilos en la inferencia (por nodos como en fit?)
std::vector<double> result(classNumStates, 0.0); std::vector<double> result(classNumStates, 0.0);
std::vector<std::thread> threads; std::vector<std::thread> threads;
std::mutex mtx; std::mutex mtx;
CountingSemaphore semaphore(maxThreadsRunning); auto& semaphore = CountingSemaphore::getInstance();
auto worker = [&](int i) { auto worker = [&](int i) {
semaphore.acquire(); semaphore.acquire();
std::string threadName = "InferenceWorker-" + std::to_string(i);
pthread_setname_np(pthread_self(), threadName.c_str());
auto completeEvidence = std::map<std::string, int>(evidence); auto completeEvidence = std::map<std::string, int>(evidence);
completeEvidence[getClassName()] = i; completeEvidence[getClassName()] = i;
double factor = computeFactor(completeEvidence); double factor = computeFactor(completeEvidence);

View File

@ -56,8 +56,6 @@ namespace bayesnet {
private: private:
std::map<std::string, std::unique_ptr<Node>> nodes; std::map<std::string, std::unique_ptr<Node>> nodes;
bool fitted; bool fitted;
float maxThreads = 0.95; // Coefficient to multiply by the number of threads available
int maxThreadsRunning; // Effective max number of threads running
int classNumStates; int classNumStates;
std::vector<std::string> features; // Including classname std::vector<std::string> features; // Including classname
std::string className; std::string className;

View File

@ -0,0 +1,46 @@
#ifndef COUNTING_SEMAPHORE_H
#define COUNTING_SEMAPHORE_H
#include <mutex>
#include <condition_variable>
#include <algorithm>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <thread>
class CountingSemaphore {
public:
static CountingSemaphore& getInstance()
{
static CountingSemaphore instance;
return instance;
}
// Delete copy constructor and assignment operator
CountingSemaphore(const CountingSemaphore&) = delete;
CountingSemaphore& operator=(const CountingSemaphore&) = delete;
void acquire()
{
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this]() { return count_ > 0; });
--count_;
}
void release()
{
std::lock_guard<std::mutex> lock(mtx_);
++count_;
if (count_ <= max_count_) {
cv_.notify_one();
}
}
private:
CountingSemaphore()
: max_count_(std::max(1u, static_cast<uint>(0.95 * std::thread::hardware_concurrency()))),
count_(max_count_)
{
}
std::mutex mtx_;
std::condition_variable cv_;
const uint max_count_;
uint count_;
};
#endif