Add python implementation of the algorithm

This commit is contained in:
2022-12-07 01:27:28 +01:00
parent 7f4b09d2d6
commit 89c7366c4e
5 changed files with 212 additions and 73 deletions

BIN
fimdlp/main Executable file

Binary file not shown.

View File

@@ -1,5 +1,6 @@
import numpy as np
from .cppfimdlp import CFImdlp
from .pyfimdlp import PyFImdlp
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils.multiclass import unique_labels
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
@@ -85,7 +86,7 @@ class FImdlp(TransformerMixin, BaseEstimator):
self.cut_points_ = [None] * self.n_features_
# Can do it in parallel
for feature in self.features_:
self.discretizer_[feature] = CFImdlp(proposal=self.proposal)
self.discretizer_[feature] = PyFImdlp(proposal=self.proposal)
self.discretizer_[feature].fit(X[:, feature], y)
self.cut_points_[feature] = self.discretizer_[
feature
@@ -149,5 +150,5 @@ class FImdlp(TransformerMixin, BaseEstimator):
def get_cut_points(self):
result = []
for feature in range(self.n_features_):
result.append(self.cut_points_[feature][:-1])
result.append(self.cut_points_[feature])
return result

142
fimdlp/pyfimdlp.py Normal file
View File

@@ -0,0 +1,142 @@
import numpy as np
from math import log
from types import SimpleNamespace
class PyFImdlp:
def __init__(self, proposal=True):
self.proposal = proposal
self.n_features_ = None
self.X_ = None
self.y_ = None
self.features_ = None
self.cut_points_ = []
self.entropy_cache = {}
self.information_gain_cache = {}
def fit(self, X, y):
self.n_features_ = len(X)
self.indices_ = np.argsort(X)
self.use_indices = True
self.X_ = X[self.indices_] if not self.use_indices else X
self.y_ = y[self.indices_] if not self.use_indices else y
self.compute_cut_points(0, len(y))
return self
def get_cut_points(self):
return sorted(list(set([cut.value for cut in self.cut_points_])))
def compute_cut_points(self, start, end):
cut = self.get_candidate(start, end)
if cut.value is None:
return
if self.mdlp(cut, start, end):
print("¡Ding!", cut.value, cut.index)
self.cut_points_.append(cut)
self.compute_cut_points(start, cut.index)
self.compute_cut_points(cut.index, end)
def mdlp(self, cut, start, end):
N = end - start
k = self.num_classes(start, end)
k1 = self.num_classes(start, cut.index)
k2 = self.num_classes(cut.index, end)
ent = self.entropy(start, end)
ent1 = self.entropy(start, cut.index)
ent2 = self.entropy(cut.index, end)
ig = self.information_gain(start, cut.index, end)
delta = log(pow(3, k) - 2, 2) - (
float(k) * ent - float(k1) * ent1 - float(k2) * ent2
)
term = 1 / N * (log(N - 1, 2) + delta)
return ig > term
def num_classes(self, start, end):
n_classes = set()
for i in range(start, end):
n_classes.add(
self.y_[self.indices_[i]] if self.use_indices else self.y_[i]
)
return len(n_classes)
def get_candidate(self, start, end):
"""Return the best cutpoint candidate for the given range.
Parameters
----------
start : int
Start of the range.
end : int
End of the range.
Returns
-------
candidate : SimpleNamespace with attributes index and value
value == None if no candidate is found.
"""
candidate = SimpleNamespace()
candidate.value = None
minEntropy = float("inf")
for idx in range(start + 1, end):
condition = (
self.y_[self.indices_[idx]] == self.y_[self.indices_[idx - 1]]
if self.use_indices
else self.y_[idx] == self.y_[idx - 1]
)
if condition:
continue
entropy_left = self.entropy(start, idx)
entropy_right = self.entropy(idx, end)
entropy_cut = entropy_left + entropy_right
if entropy_cut < minEntropy:
minEntropy = entropy_cut
candidate.index = idx
if self.use_indices:
candidate.value = (
self.X_[self.indices_[idx]]
+ self.X_[self.indices_[idx - 1]]
) / 2
else:
candidate.value = (self.X_[idx] + self.X_[idx - 1]) / 2
return candidate
def entropy(self, start, end) -> float:
n_labels = end - start
if n_labels <= 1:
return 0
if (start, end) in self.entropy_cache:
return self.entropy_cache[(start, end)]
if self.use_indices:
counts = np.bincount(self.y_[self.indices_[start:end]])
else:
counts = np.bincount(self.y_[start:end])
proportions = counts / n_labels
n_classes = np.count_nonzero(proportions)
if n_classes <= 1:
return 0
entropy = 0.0
# Compute standard entropy.
for prop in proportions:
if prop != 0.0:
entropy -= prop * log(prop, 2)
self.entropy_cache[(start, end)] = entropy
return entropy
def information_gain(self, start, cut, end):
if (start, cut, end) in self.information_gain_cache:
return self.information_gain_cache[(start, cut, end)]
labels = end - start
if labels == 0:
return 0.0
entropy = self.entropy(start, end)
card_left = cut - start
entropy_left = self.entropy(start, cut)
card_right = end - cut
entropy_right = self.entropy(cut, end)
result = (
entropy
- (card_left / labels) * entropy_left
- (card_right / labels) * entropy_right
)
self.information_gain_cache[(start, cut, end)] = result
return result

View File

@@ -31,13 +31,9 @@ class FImdlpTest(unittest.TestCase):
[0.75, 1.399999976158142, 1.5],
]
self.assertListEqual(expected, clf.get_cut_points())
self.assertListEqual(
["feature_0", "feature_1", "feature_2", "feature_3"], clf.features_
)
self.assertEqual("class", clf.class_name_)
clf.fit(X, y, features=["a", "b", "c", "d"], class_name="class_name")
self.assertListEqual(["a", "b", "c", "d"], clf.features_)
self.assertEqual("class_name", clf.class_name_)
self.assertListEqual([0, 1, 2, 3], clf.features_)
clf.fit(X, y, features=[0, 2, 3])
self.assertListEqual([0, 2, 3], clf.features_)
def test_fit_Errors(self):
clf = FImdlp()

128
sample.py
View File

@@ -61,69 +61,69 @@ data = load_iris()
X = data.data
y = data.target
features = data.feature_names
# test = FImdlp()
# test.fit(X, y, features=features)
# test.transform(X)
# test.get_cut_points()
for proposal in [True, False]:
X = data.data
y = data.target
print("*** Proposal: ", proposal)
test = CFImdlp(debug=True, proposal=proposal)
test.fit(X[:, 0], y)
result = test.get_cut_points()
for item in result:
print(
f"Class={item['classNumber']} - ({item['start']:3d}, "
f"{item['end']:3d}) -> ({item['fromValue']:3.1f}, "
f"{item['toValue']:3.1f}]"
)
print(test.get_discretized_values())
print("+" * 40)
X = np.array(
[
[5.1, 3.5, 1.4, 0.2],
[5.2, 3.0, 1.4, 0.2],
[5.3, 3.2, 1.3, 0.2],
[5.4, 3.1, 1.5, 0.2],
]
)
y = np.array([0, 0, 0, 1])
print(test.fit(X[:, 0], y).transform(X[:, 0]))
result = test.get_cut_points()
for item in result:
print(
f"Class={item['classNumber']} - ({item['start']:3d}, {item['end']:3d})"
f" -> ({item['fromValue']:3.1f}, {item['toValue']:3.1f}]"
)
print("*" * 40)
# print(Xs, ys)
# print("**********************")
# test = [(0, 3), (4, 4), (5, 5), (6, 8), (9, 9)]
# print(ys)
# for start, end in test:
# print("Testing ", start, end, ys[:end], ys[end:])
# print("Information gain: ", information_gain(ys, ys[:end], ys[end:]))
# print(test.transform(X))
# print(X)
# print(indices)
# print(np.array(X)[indices])
test = FImdlp()
test.fit(X, y)
test.transform(X)
print(test.get_cut_points())
# for proposal in [True, False]:
# X = data.data
# y = data.target
# print("*** Proposal: ", proposal)
# test = CFImdlp(debug=True, proposal=proposal)
# test.fit(X[:, 0], y)
# result = test.get_cut_points()
# for item in result:
# print(
# f"Class={item['classNumber']} - ({item['start']:3d}, "
# f"{item['end']:3d}) -> ({item['fromValue']:3.1f}, "
# f"{item['toValue']:3.1f}]"
# )
# print(test.get_discretized_values())
# print("+" * 40)
# X = np.array(
# [
# [5.1, 3.5, 1.4, 0.2],
# [5.2, 3.0, 1.4, 0.2],
# [5.3, 3.2, 1.3, 0.2],
# [5.4, 3.1, 1.5, 0.2],
# ]
# )
# y = np.array([0, 0, 0, 1])
# print(test.fit(X[:, 0], y).transform(X[:, 0]))
# result = test.get_cut_points()
# for item in result:
# print(
# f"Class={item['classNumber']} - ({item['start']:3d}, {item['end']:3d})"
# f" -> ({item['fromValue']:3.1f}, {item['toValue']:3.1f}]"
# )
# print("*" * 40)
# # print(Xs, ys)
# # print("**********************")
# # test = [(0, 3), (4, 4), (5, 5), (6, 8), (9, 9)]
# # print(ys)
# # for start, end in test:
# # print("Testing ", start, end, ys[:end], ys[end:])
# # print("Information gain: ", information_gain(ys, ys[:end], ys[end:]))
# # print(test.transform(X))
# # print(X)
# # print(indices)
# # print(np.array(X)[indices])
# # k = test.cut_points(X[:, 0], y)
# # print(k)
# # k = test.cut_points_ant(X[:, 0], y)
# # print(k)
# # test.debug_points(X[:, 0], y)
# X = [5.7, 5.3, 5.2, 5.1, 5.0, 5.6, 5.1, 6.0, 5.1, 5.9]
# y = [1, 1, 1, 1, 1, 2, 2, 2, 2, 2]
# indices = [4, 3, 6, 8, 2, 1, 5, 0, 9, 7]
# clf = CFImdlp(debug=True, proposal=False)
# clf.fit(X, y)
# print(clf.get_cut_points())
# y = [1, 1, 1, 1, 1, 2, 2, 2, 2, 2]
# # To check
# indices2 = np.argsort(X)
# Xs = np.array(X)[indices2]
# ys = np.array(y)[indices2]
kdd_JapaneseVowels
# # # k = test.cut_points(X[:, 0], y)
# # # print(k)
# # # k = test.cut_points_ant(X[:, 0], y)
# # # print(k)
# # # test.debug_points(X[:, 0], y)
# # X = [5.7, 5.3, 5.2, 5.1, 5.0, 5.6, 5.1, 6.0, 5.1, 5.9]
# # y = [1, 1, 1, 1, 1, 2, 2, 2, 2, 2]
# # indices = [4, 3, 6, 8, 2, 1, 5, 0, 9, 7]
# # clf = CFImdlp(debug=True, proposal=False)
# # clf.fit(X, y)
# # print(clf.get_cut_points())
# # y = [1, 1, 1, 1, 1, 2, 2, 2, 2, 2]
# # # To check
# # indices2 = np.argsort(X)
# # Xs = np.array(X)[indices2]
# # ys = np.array(y)[indices2]
# kdd_JapaneseVowels