Having a multiclass classification task. My goal is to solve this using the Local Classifier per Parent Node (LCPN) approach.
Let me explain using a MWE.
Say I have this dummy dataset:
import numpy as np
from sklearn.datasets import make_classification
from scipy.cluster import hierarchy
X, y = make_classification(n_samples=1000, n_features=10, n_classes=5,
n_informative=4)
I came up with the distance matrix between these classes to be:
d = np.array(
[[ 0., 201.537, 197.294, 200.823, 194.517],
[201.537, 0., 199.449, 202.941, 196.703],
[197.294, 199.449, 0., 198.728, 192.354],
[200.823, 202.941, 198.728, 0., 195.972],
[[194.517, 196.703, 192.354, 195.972, 0. ]]
)
So, I determined the class hierarchy like so:
hc = hierarchy.linkage(d, method='complete')
The dendrogram obtained is thus:
dendrogram = hierarchy.dendrogram(hc, labels=['A','B','C', 'D', 'F'])
dendrogram

Which I illustrate in a tree-like structure, using hierarchy.to_tree() as:

My Question:
How do I fit a classifier such as a DecisionTreeClassifier or SVM at each internal node (including the root), following the LCPN method, to proceed as in the tree illustration above?
The PerParentNodeLocalClassifier estimator below works as follows.
Z is derived from the distance matrix, and represented as a treeCredit to @NickODell with this answer for the graph recursion code, using graphviz to construct the graph visualisation. I modelled all the recursion routines on this same method.
At inference time, we take one sample at a time and traverse the tree. The classifier at each node is used to determine whether we branch left (0) or right (1).
Some details about the implementation below:
DecisionTreeClassifier. You can specify an alternative classifier using the estimator= parameter.dissimilarity="euclidean". You can alternatively specify your own distance matrix by setting dissimilarity="precomputed" and then .fit(X, y, dissimilarity_matrix=<precomputed_matrix>).The implementation is not field-tested, so it's recommended to use the default verbose=1 level and keep plot_figures=True. There's useful summary information there for sanity-checking the estimator's logic. The higher verbosity level verbose=2 reports results on a per-sample basis.
Let's take two test cases; your example data and a more complex case.
Outputs:

Building leaf assignments dict (0: left, 1:right)
classifier at parent node 8
comprises classes [1, 3, 0, 2, 4]
with cluster assignments [0, 1, 1, 1, 1]
classifier at parent node 7
comprises classes [3, 0, 2, 4]
with cluster assignments [0, 1, 1, 1]
...
Fitting classifiers
Fitting node8 classifier using 1000 samples (100% of the data)
left group: [1] (201 samples)
right group: [3, 0, 2, 4] (799 samples)
imbalance (≥1) ratio: 3.98
Fitting node7 classifier using 799 samples (80% of the data)
left group: [3] (199 samples)
right group: [0, 2, 4] (600 samples)
imbalance (≥1) ratio: 3.02
...
If we initialise the estimator with verbose=2 we get sample-wise reports, useful for checking individual samples.
clf.verbose = 2
clf.predict(X[:1])
Predicting for sample
X_i: [ 0.93065833 -1.10836254 -1.27437476 0.16353084 -0.9043927 0.114701...
node8 is branching right (1)
node7 is branching left (0)
final prediction is encoded class=3 [y=3]
The test data and precomputed distance matrix:
import numpy as np
from sklearn.datasets import make_classification
from scipy.cluster import hierarchy
X, y = make_classification(
n_samples=1000, n_features=10, n_classes=5, n_informative=4, random_state=0
)
distance_matrix = np.array(
[[ 0., 201.537, 197.294, 200.823, 194.517],
[201.537, 0., 199.449, 202.941, 196.703],
[197.294, 199.449, 0., 198.728, 192.354],
[200.823, 202.941, 198.728, 0., 195.972],
[194.517, 196.703, 192.354, 195.972, 0. ]]
)
Fit and score:
clf = PerParentNodeLocalClassifier(
dissimilarity='precomputed',
linkage_method='complete',
# estimator=LinearSVC() #optionally specify an alternative base classifier
)
clf.fit(X, y, dissimilarity_matrix=distance_matrix)
clf.predict(X_val)
clf.score(X_val, y_val)
A more complex tree structure.

Test data, this time without a precomputed distance matrix:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from sklearn.datasets import make_classification
n_features = 7
n_classes = 5
X_a, y_a = make_classification(
n_samples=10_000,
n_features=n_features, n_informative=5,
n_redundant=0, n_repeated=0,
n_classes=n_classes, random_state=0
)
Usage:
clf = PerParentNodeLocalClassifier(random_state=0).fit(X_a, y_a)
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from sklearn.base import BaseEstimator, clone, ClassifierMixin, MetaEstimatorMixin
from sklearn.utils.validation import check_array, check_X_y, check_is_fitted
from sklearn.preprocessing import LabelEncoder
from scipy.spatial.distance import squareform
from scipy.cluster import hierarchy
from sklearn.tree import DecisionTreeClassifier
from multiprocessing import cpu_count, Pool
class PerParentNodeLocalClassifier (BaseEstimator, ClassifierMixin, MetaEstimatorMixin):
"""
A classifier implementing the "local classifier per parent node" (LCPN) algorithm.
LPCN is a classification algorithm where local classification decisions are
made over a pre-defined class hierarchy, resulting in a final prediction
that reflects the hierarchical relationship between classes.
This implementation is a meta-estimator that fits a classifier at each parent
node. Parent nodes are derived from a class dissimilarity (distance) matrix
using agglomerative clustering. The class dissimilarity matrix is either
user-supplied or computed from the data during fitting.
Parameters
----------
estimator : object, default=None
The base estimator used for each local classifier. If None, then the
base estimator is DecisionTreeClassifier initialised with
`max_depth=5` and `class_weight="balanced"`.
linkage_method : {"single", "complete", "average", "weighted", "centroid",
"median", "ward"}, default="single"
The linkage method used for agglomerative clustering, supplied to
scipy.hierarchy.linkage().
dissimilarity : {"precomputed", "euclidean"}, default="euclidean"
If "precomputed", the class dissimilarity (distance) matrix must be
supplied as `.fit(X, y, dissimilarity_matrix=...)`.
If "euclidean", a dissimilarity matrix will be computed from the data
during `.fit(X, y)`. In this case, the dissimilarity between each pair
of classes `(i, j)` is defined as the euclidean distance between their
centroids in feature space.
verbose : bool or int, default=1
Controls the verbosity when fitting and predicting. Use 1/True for
general information, and 2 for per-sample details. 0 or False not
recommended as this estimator has not been field-tested.
plot_figures : bool, default=True
If True, two plots will be rendered showing the dendrograms and the
decision tree. Useful for a visual check of the estimator's classification
logic. False is not recommended as this estimator has not been field-tested.
n_jobs : int, default=1
Specifies how many workers to dispatch when predicting. -1 will use
all CPUs. When set to 1, the samples will be evaluated strictly
sequentially.
random_state : int, RandomState instance or None, default=None
Controls the randomness of `estimator`, if applicable.
"""
def __init__(
self,
estimator=None,
linkage_method='single',
dissimilarity='euclidean',
verbose=1,
plot_figures=True,
n_jobs=1,
random_state=None
):
self.estimator = estimator
self.linkage_method = linkage_method
self.dissimilarity = dissimilarity
self.verbose = verbose
self.plot_figures = plot_figures
self.n_jobs = n_jobs
self.random_state = random_state
def fit(self, X, y, **kwargs):
"""
Fit the estimator. Optionally takes `dissimilarity_matrix=` parameter.
Parameters:
X (pd.DataFrame, np.ndarray): Features shaped (n_samples, n_features).
y (pd.Series, np.ndarray): Multi-class labels (n_samples,).
**kwargs
dissimilarity_matrix (np.ndarray): Required if `dissimilarity="precomputed"`,
shaped (n_classes, n_classes)
"""
#Input checks for sklearn estimators
X, y = check_X_y(X, y)
#sklearn checks
if hasattr(X, 'columns'):
self.feature_names_in_ = np.array(
X.columns, dtype='object'
)
self.n_features_in_ = X.shape[1]
#For simplicity always define a self.label_encoder_
# It won't make a difference if supplied y is encoded.
self.label_encoder_ = LabelEncoder().fit(y)
self.classes_original_ = self.label_encoder_.classes_
self.n_classes = len(self.classes_original_)
#encoded values are used for all computations
self.classes_ = self.label_encoder_.transform(self.classes_original_)
if self.verbose:
print(
f'\nSupplied classes', self.classes_original_, 'encoded to',
str(self.classes_),
'\nFigures display encoded values. Predictions returned in original format.'
)
self.X = X
self.y = self.label_encoder_.transform(y)
#Handle dissimilarity= param
# creates self.dissimilarity_matrix_
self.handle_dissimilarity_arg(kwargs)
if self.verbose:
matrix_df = pd.DataFrame(
self.dissimilarity_matrix_,
index=[i for i in self.classes_],
columns=[i for i in self.classes_]
).rename_axis(columns='class')
display(
matrix_df
.style
.set_caption(self.dissimilarity + ' dissimilarity matrix')
)
#Create linkage matrix
# creates self.Z_ and self._df_
self.create_linkage_matrix()
if self.verbose:
display(
self.Z_df_
.style
.set_caption('linkage matrix Z')
.background_gradient(subset='linkage_distance', cmap='plasma')
)
if self.plot_figures:
self.plot_dendrogram()
#Linkage matrix as tree
self.root_node_ = hierarchy.to_tree(self.Z_)
if self.plot_figures:
self.render_tree_visualisation()
#Leaf assignments
self.leaf_assignments_dict = {}
self.build_leaf_assignments_dict_recursive(
self.root_node_,
self.leaf_assignments_dict
)
#Fit classifiers
self.classifiers_dict_ = {}
if self.estimator is None:
self.estimator = DecisionTreeClassifier(
max_depth=5,
class_weight='balanced',
)
if self.verbose:
print('\nUsing default base estimator:\n', self.estimator)
if hasattr(self.estimator, 'random_state'):
self.estimator.set_params(**{'random_state': self.random_state})
self.fit_classifier_tree_recursive(self.root_node_, self.classifiers_dict_)
return self
def predict(self, X):
"""
Render a prediction for each sample.
"""
check_is_fitted(self)
X = check_array(X)
if not self.n_jobs:
raise ValueError('n_jobs must be a non-zero integer')
#Strictly sequential evaluation is useful for debugging
if self.n_jobs == 1:
predictions = []
print()
for i, sample in enumerate(X):
if self.verbose:
print(
f'Rendering prediction for sample',
f'{i+1:>5d} of {len(X):>5d} [{(i+1) / len(X):>5.1%}]...',
end='\r' if i + 1 != len(X) else 'done\n',
)
predictions.append( self.predict_sample(sample) )
else:
n_jobs = self.n_jobs if self.n_jobs > 0 else self.n_jobs + cpu_count() + 1
if self.verbose:
print(f'\n Predicting using {n_jobs} CPUs')
with Pool(n_jobs) as p:
predictions = p.map(self.predict_sample, X)
return self.label_encoder_.inverse_transform(predictions)
#
# Helper methods
#
def handle_dissimilarity_arg(self, fit_kwargs):
"""
Sets the dissimilarity matrix.
Either compute it from Xy if dissimilariy="euclidean",
or use dissimilarity_matrix supplied to
.fit() when dissimilarity='precomputed'.
"""
if self.dissimilarity == 'euclidean':
self.dissimilarity_matrix_ = self.compute_euclidean_class_dissimilarity_matrix()
elif self.dissimilarity == 'precomputed':
if 'dissimilarity_matrix' not in fit_kwargs:
raise TypeError(
".fit() requires 'dissimilarity_matrix=' argument "
"when dissimilarity='precomputed'"
)
dissimilarity_matrix = fit_kwargs['dissimilarity_matrix']
self.dissimilarity_matrix_ = self.sanitise_dissimilarity_matrix(
dissimilarity_matrix
)
def compute_euclidean_class_dissimilarity_matrix(self):
"""
Computes (n_classes, n_classes) dissimilarity (distance) matrix
using the euclidean distance between pairs of classes in feature space.
"""
class_centroids = np.stack(
[self.X[self.y==klass].mean(axis=0) for klass in self.classes_],
axis=0
)
dissimilarity_matrix = np.array([
[np.linalg.norm(class_centroids[i] - class_centroids[j]) for j in self.classes_]
for i in self.classes_
]).reshape(self.n_classes, self.n_classes)
return self.sanitise_dissimilarity_matrix(dissimilarity_matrix)
def sanitise_dissimilarity_matrix(self, mtx):
"""
Checks if the supplied matrix is symmetrical and has a zero diagonal.
If not, it forces those conditions on the matrix.
"""
if np.array_equal(mtx, mtx.T) and np.abs(mtx).trace()==0.:
return mtx
if self.verbose:
print(
'\nDissimilarity matrix is either asymmetrical or has '
'non-zero diagonal. Coercing matrix for symmetry and diagonal=0.'
)
#Enforce symmetry, diag=0
np.fill_diagonal(
mtx_clean := (mtx + mtx.T) / 2,
val=0
)
return mtx_clean
def create_linkage_matrix(self):
"""
Uses scipy to create the linkage matrix Z, and a DataFrame version as well.
"""
self.Z_ = hierarchy.linkage(
squareform(self.dissimilarity_matrix_),
method=self.linkage_method,
)
self.Z_df_ = pd.DataFrame({
'class_i': self.Z_[:, 0].astype(int),
'class_j': self.Z_[:, 1].astype(int),
'linkage_distance': self.Z_[:, 2],
'total_members': self.Z_[:, 3].astype(int)
}).rename_axis(index='step')
def plot_dendrogram(self):
"""
Uses scipy to draw a dendrogram for each link point.
"""
f, axs = plt.subplots(
nrows=1, ncols=len(self.Z_df_), figsize=(11, 3),
sharey=True, layout='constrained'
)
for ax, cut_pt in zip(axs, self.Z_df_.linkage_distance):
dendrogram = hierarchy.dendrogram(
self.Z_,
leaf_font_size=10,
labels=self.classes_,
color_threshold=cut_pt,
above_threshold_color='darkslategray',
ax=ax
)
ax.axhline(cut_pt, linewidth=1.5, color='deeppink', linestyle=':')
ax.spines[['top', 'right', 'bottom']].set_visible(False)
ax.set_title(f'linkage step {np.argwhere(ax==axs).item()}')
#Formatting
ax = axs[0]
ax.set_xlabel('class')
ax.spines.left.set_visible(True)
ax.set_ylabel(self.linkage_method + ' linkage distance')
def render_tree_visualisation(self):
"""
Uses graphviz package to display the linkage matrix as a tree
"""
try:
import graphviz
except ModuleNotFoundError as e:
print(
'Error importing graphiz package. '
'graphviz package is required for tree visualisation.\n', e
)
return
try:
dot = graphviz.Digraph('cluster_heirarchy')
self.render_dot_tree_recursive(self.root_node_, dot)
display(dot)
except Exception as e:
print('Error rendering graph visualisation using graphviz\n', e)
def render_dot_tree_recursive(self, node, dot, parent=None):
if node.is_leaf():
dot.node(f"node{node.id}", f'class {node.id}')
if parent:
dot.edge(f"node{parent.id}", f"node{node.id}")
if node.left:
self.render_dot_tree_recursive(node.left, dot, node)
if node.right:
self.render_dot_tree_recursive(node.right, dot, node)
#Build classifier tree
def build_leaf_assignments_dict_recursive(self, node, res_dict, parent=None):
"""
Recursively populates a dictionary that identifies each parent node with
its class members.
"""
if node.is_leaf():
return
# 0 2 4 1 3 relevant members, left-right
# 0 1 0 1 1 assignments for [y0 y1 y2 y3 y4]
#assignments[lr membs] = relevant assignments, l-r order
leaves_lr = np.array(node.pre_order())
assignments_lr = hierarchy.cut_tree(self.Z_, height=node.dist).ravel()[leaves_lr]
#Convert left to 0, right as 1
assignments_lr = (assignments_lr - assignments_lr[0]).astype(bool).astype(int)
res_dict[node.id] = [leaves_lr, assignments_lr]
if self.verbose:
if parent is None:
print('\n', 'Building leaf assignments dict (0: left, 1:right)'.center(80, ' '))
print(
'classifier at parent node', node.id,
'\n comprises classes ', leaves_lr.tolist(),
'\n with cluster assignments', assignments_lr.tolist(),
)
self.build_leaf_assignments_dict_recursive(node.left, res_dict, node)
self.build_leaf_assignments_dict_recursive(node.right, res_dict, node)
def fit_classifier_tree_recursive(self, node, clf_dict, parent=None):
"""
Fit a classifier at each parent node, and populate a dict with the
classifiers.
"""
if node.is_leaf():
return
#leaves relevant to this node, and their assignments
leaves, assignments = self.leaf_assignments_dict[node.id]
#discard samples with irrelevant leaves
subset_ixs_bool = np.isin(self.y, leaves)
X_subset, y_subset = [arr[subset_ixs_bool] for arr in [self.X, self.y]]
#group of left classes, and right classes
# use to binarise y as 0:left group, 1:right group
left_group, right_group = [leaves[assignments==i] for i in [0, 1]]
y_bin = np.where(np.isin(y_subset, left_group), 0, 1)
#Fit a classifer for this node
clf = clone(self.estimator)
if self.verbose:
if not parent:
print('\n', 'Fitting classifiers'.center(80, ' '))
nsamples_left, nsamples_right = [sum(y_bin==i) for i in [0, 1]]
imbalance_ratio = (
max(nsamples_left, nsamples_right) / min(nsamples_left, nsamples_right)
)
print(
f'Fitting node{node.id} classifier using {y_bin.size} samples',
f'({y_bin.size / self.y.size:.0%} of the data)',
f'\n left group: {left_group.tolist()} ({nsamples_left} samples)',
f'\n right group: {right_group.tolist()} ({nsamples_right} samples)',
f'\n imbalance (≥1) ratio: {imbalance_ratio:>4.2f}',
)
clf.fit(X_subset, y_bin)
clf_dict[node.id] = clf
#recurse into child nodes
self.fit_classifier_tree_recursive(node.left, clf_dict, node)
self.fit_classifier_tree_recursive(node.right, clf_dict, node)
def predict_sample(self, sample):
"""
Prediction for a single sample.
"""
if self.verbose == 2:
print(
'\n', 'Predicting for sample'.center(80, ' '),
'\nX_i: ', str(sample)[:70] + '...',
)
if sample.ndim == 1:
sample = sample.reshape(1, -1)
prediction = [None] #recursion will deposit value in prediction[0]
self.predict_sample_recursive(sample, prediction, self.root_node_)
return prediction[0]
def predict_sample_recursive(self, sample, return_value_placeholder, node, parent=None):
if sample.ndim == 1:
sample = sample.reshape(1, -1)
if self.verbose == 2:
if node.is_leaf():
print(
f' final prediction is encoded class={node.id}',
f'[y={self.label_encoder_.inverse_transform([node.id]).item()}]'
)
#Reached leaf node/final prediction
if node.is_leaf():
return_value_placeholder[0] = node.id
return
#Get left/right prediction at this node
prediction = self.classifiers_dict_[node.id].predict(sample).item()
if self.verbose == 2:
print(f' node{node.id}', 'is branching', ['left (0)', 'right (1)'][prediction])
#Recurse into the left/right branch depending on "prediction"
self.predict_sample_recursive(
sample,
return_value_placeholder,
[node.left, node.right][prediction],
parent=node
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With