Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Hierarchical classification approach to a multiclass problem

Having a multiclass classification task. My goal is to solve this using the Local Classifier per Parent Node (LCPN) approach.

Let me explain using a MWE.

Say I have this dummy dataset:

import numpy as np
from sklearn.datasets import make_classification
from scipy.cluster import hierarchy

X, y = make_classification(n_samples=1000, n_features=10, n_classes=5,
                             n_informative=4)

I came up with the distance matrix between these classes to be:

d = np.array(
[[  0.,    201.537, 197.294, 200.823, 194.517],
 [201.537,   0.,    199.449, 202.941, 196.703],
 [197.294, 199.449,   0.,    198.728, 192.354],
 [200.823, 202.941, 198.728,   0.,    195.972],
[[194.517, 196.703, 192.354, 195.972,   0.   ]]
)

So, I determined the class hierarchy like so:

hc = hierarchy.linkage(d, method='complete')

The dendrogram obtained is thus:

dendrogram = hierarchy.dendrogram(hc, labels=['A','B','C', 'D', 'F'])
dendrogram

enter image description here

Which I illustrate in a tree-like structure, using hierarchy.to_tree() as:

enter image description here

My Question:

How do I fit a classifier such as a DecisionTreeClassifier or SVM at each internal node (including the root), following the LCPN method, to proceed as in the tree illustration above?

like image 895
super_ask Avatar asked Oct 26 '25 12:10

super_ask


1 Answers

The PerParentNodeLocalClassifier estimator below works as follows.

  • A linkage matrix Z is derived from the distance matrix, and represented as a tree
  • For each parent node in the tree, the classes belonging to the left and right branches are grouped and binarised into 0 (left group) and 1 (right group).
  • Irrelevant classes for that node are removed, and a classifier is fitted to learn left/right predictions at that node.
  • We recurse through the tree, fitting a classifier as above for each parent node.

Credit to @NickODell with this answer for the graph recursion code, using graphviz to construct the graph visualisation. I modelled all the recursion routines on this same method.

At inference time, we take one sample at a time and traverse the tree. The classifier at each node is used to determine whether we branch left (0) or right (1).

Some details about the implementation below:

  • By default the base estimator is a DecisionTreeClassifier. You can specify an alternative classifier using the estimator= parameter.
  • The estimator computes the class distance matrix during fitting when dissimilarity="euclidean". You can alternatively specify your own distance matrix by setting dissimilarity="precomputed" and then .fit(X, y, dissimilarity_matrix=<precomputed_matrix>).

The implementation is not field-tested, so it's recommended to use the default verbose=1 level and keep plot_figures=True. There's useful summary information there for sanity-checking the estimator's logic. The higher verbosity level verbose=2 reports results on a per-sample basis.

Let's take two test cases; your example data and a more complex case.

Test case 1

Outputs:

enter image description here

                Building leaf assignments dict (0: left, 1:right)                
classifier at parent node 8 
 comprises classes        [1, 3, 0, 2, 4] 
 with cluster assignments [0, 1, 1, 1, 1]
classifier at parent node 7 
 comprises classes        [3, 0, 2, 4] 
 with cluster assignments [0, 1, 1, 1]
...
                               Fitting classifiers                               
Fitting node8 classifier using 1000 samples (100% of the data) 
 left group:  [1] (201 samples) 
 right group: [3, 0, 2, 4] (799 samples) 
 imbalance (≥1) ratio: 3.98
Fitting node7 classifier using 799 samples (80% of the data) 
 left group:  [3] (199 samples) 
 right group: [0, 2, 4] (600 samples) 
 imbalance (≥1) ratio: 3.02
...

If we initialise the estimator with verbose=2 we get sample-wise reports, useful for checking individual samples.

clf.verbose = 2
clf.predict(X[:1])
                             Predicting for sample                               
X_i:  [ 0.93065833 -1.10836254 -1.27437476  0.16353084 -0.9043927   0.114701...
 node8 is branching right (1)
 node7 is branching left (0)
 final prediction is encoded class=3 [y=3]

The test data and precomputed distance matrix:

import numpy as np
from sklearn.datasets import make_classification
from scipy.cluster import hierarchy

X, y = make_classification(
    n_samples=1000, n_features=10, n_classes=5, n_informative=4, random_state=0
)

distance_matrix = np.array(
    [[  0.,    201.537, 197.294, 200.823, 194.517],
     [201.537,   0.,    199.449, 202.941, 196.703],
     [197.294, 199.449,   0.,    198.728, 192.354],
     [200.823, 202.941, 198.728,   0.,    195.972],
     [194.517, 196.703, 192.354, 195.972,   0.   ]]
)

Fit and score:

clf = PerParentNodeLocalClassifier(
    dissimilarity='precomputed',
    linkage_method='complete',
    # estimator=LinearSVC() #optionally specify an alternative base classifier
)

clf.fit(X, y, dissimilarity_matrix=distance_matrix)

clf.predict(X_val)
clf.score(X_val, y_val)

Test case 2

A more complex tree structure. enter image description here

Test data, this time without a precomputed distance matrix:

import numpy as np
import pandas as pd
from matplotlib import pyplot as plt

from sklearn.datasets import make_classification

n_features = 7
n_classes = 5
X_a, y_a = make_classification(
    n_samples=10_000,
    n_features=n_features, n_informative=5,
    n_redundant=0, n_repeated=0,
    n_classes=n_classes, random_state=0
)

Usage:

clf = PerParentNodeLocalClassifier(random_state=0).fit(X_a, y_a)

Classifier implementation

import numpy as np
import pandas as pd
from matplotlib import pyplot as plt

from sklearn.base import BaseEstimator, clone, ClassifierMixin, MetaEstimatorMixin
from sklearn.utils.validation import check_array, check_X_y, check_is_fitted
from sklearn.preprocessing import LabelEncoder

from scipy.spatial.distance import squareform
from scipy.cluster import hierarchy

from sklearn.tree import DecisionTreeClassifier
from multiprocessing import cpu_count, Pool

class PerParentNodeLocalClassifier (BaseEstimator, ClassifierMixin, MetaEstimatorMixin):
    """
    A classifier implementing the "local classifier per parent node" (LCPN) algorithm.
    
    LPCN is a classification algorithm where local classification decisions are
    made over a pre-defined class hierarchy, resulting in a final prediction
    that reflects the hierarchical relationship between classes.
    
    This implementation is a meta-estimator that fits a classifier at each parent
    node. Parent nodes are derived from a class dissimilarity (distance) matrix
    using agglomerative clustering. The class dissimilarity matrix is either
    user-supplied or computed from the data during fitting.
    
    Parameters
    ----------
    estimator : object, default=None
        The base estimator used for each local classifier. If None, then the
        base estimator is DecisionTreeClassifier initialised with
        `max_depth=5` and `class_weight="balanced"`.
    
    linkage_method : {"single", "complete", "average", "weighted", "centroid",
                      "median", "ward"}, default="single"
        The linkage method used for agglomerative clustering, supplied to
        scipy.hierarchy.linkage().
    
    dissimilarity : {"precomputed", "euclidean"}, default="euclidean"
        If "precomputed", the class dissimilarity (distance) matrix must be
        supplied as `.fit(X, y, dissimilarity_matrix=...)`.
        
        If "euclidean", a dissimilarity matrix will be computed from the data
        during `.fit(X, y)`. In this case, the dissimilarity between each pair
        of classes `(i, j)` is defined as the euclidean distance between their
        centroids in feature space.
    
    verbose : bool or int, default=1
        Controls the verbosity when fitting and predicting. Use 1/True for
        general information, and 2 for per-sample details. 0 or False not
        recommended as this estimator has not been field-tested.
    
    plot_figures : bool, default=True
        If True, two plots will be rendered showing the dendrograms and the
        decision tree. Useful for a visual check of the estimator's classification
        logic. False is not recommended as this estimator has not been field-tested.
    
    n_jobs : int, default=1
        Specifies how many workers to dispatch when predicting. -1 will use
        all CPUs. When set to 1, the samples will be evaluated strictly
        sequentially.
    
    random_state : int, RandomState instance or None, default=None
        Controls the randomness of `estimator`, if applicable.
    """
    def __init__(
        self,
        estimator=None,
        linkage_method='single',
        dissimilarity='euclidean',
        verbose=1,
        plot_figures=True,
        n_jobs=1,
        random_state=None
    ):
        self.estimator = estimator
        self.linkage_method = linkage_method
        self.dissimilarity = dissimilarity
        self.verbose = verbose
        self.plot_figures = plot_figures
        self.n_jobs = n_jobs
        self.random_state = random_state
        
    def fit(self, X, y, **kwargs):
        """
        Fit the estimator. Optionally takes `dissimilarity_matrix=` parameter.
        
        Parameters:
        X (pd.DataFrame, np.ndarray): Features shaped (n_samples, n_features).
        y (pd.Series, np.ndarray): Multi-class labels (n_samples,).
        
        **kwargs
        dissimilarity_matrix (np.ndarray): Required if `dissimilarity="precomputed"`,
                                           shaped (n_classes, n_classes)
        """
        #Input checks for sklearn estimators
        X, y = check_X_y(X, y)
                
        #sklearn checks
        if hasattr(X, 'columns'):
            self.feature_names_in_ = np.array(
                X.columns, dtype='object'
            )
        self.n_features_in_ = X.shape[1]
        
        #For simplicity always define a self.label_encoder_
        # It won't make a difference if supplied y is encoded.
        self.label_encoder_ = LabelEncoder().fit(y)
        self.classes_original_ = self.label_encoder_.classes_
        self.n_classes = len(self.classes_original_)
        
        #encoded values are used for all computations
        self.classes_ = self.label_encoder_.transform(self.classes_original_)
        
        if self.verbose:
            print(
                f'\nSupplied classes', self.classes_original_, 'encoded to',
                str(self.classes_),
                '\nFigures display encoded values. Predictions returned in original format.'
            )
        
        self.X = X
        self.y = self.label_encoder_.transform(y)
        
        #Handle dissimilarity= param
        # creates self.dissimilarity_matrix_
        self.handle_dissimilarity_arg(kwargs)
        if self.verbose:
            matrix_df = pd.DataFrame(
                self.dissimilarity_matrix_,
                index=[i for i in self.classes_],
                columns=[i for i in self.classes_]
            ).rename_axis(columns='class')
            
            display(
                matrix_df
                .style
                .set_caption(self.dissimilarity + ' dissimilarity matrix')
            )
        
        #Create linkage matrix
        # creates self.Z_ and self._df_
        self.create_linkage_matrix()
        if self.verbose:
            display(
                self.Z_df_
                .style
                .set_caption('linkage matrix Z')
                .background_gradient(subset='linkage_distance', cmap='plasma')
            )
        if self.plot_figures:
            self.plot_dendrogram()
        
        #Linkage matrix as tree
        self.root_node_ = hierarchy.to_tree(self.Z_)
        if self.plot_figures:
            self.render_tree_visualisation()
        
        #Leaf assignments
        self.leaf_assignments_dict = {}
        self.build_leaf_assignments_dict_recursive(
            self.root_node_,
            self.leaf_assignments_dict
        )
        
        #Fit classifiers
        self.classifiers_dict_ = {}
        
        if self.estimator is None:
            self.estimator = DecisionTreeClassifier(
                max_depth=5,
                class_weight='balanced',
            )
            
            if self.verbose:
                print('\nUsing default base estimator:\n', self.estimator)
        
        if hasattr(self.estimator, 'random_state'):
            self.estimator.set_params(**{'random_state': self.random_state})
            
        self.fit_classifier_tree_recursive(self.root_node_, self.classifiers_dict_)
        
        return self
    
    def predict(self, X):
        """
        Render a prediction for each sample.
        """
        check_is_fitted(self)
        
        X = check_array(X)
        
        if not self.n_jobs:
            raise ValueError('n_jobs must be a non-zero integer')
        
        #Strictly sequential evaluation is useful for debugging
        if self.n_jobs == 1:
            predictions = []
            print()
            for i, sample in enumerate(X):
                if self.verbose:
                    print(
                        f'Rendering prediction for sample',
                        f'{i+1:>5d} of {len(X):>5d} [{(i+1) / len(X):>5.1%}]...',
                        end='\r' if i + 1 != len(X) else 'done\n',
                    )
                predictions.append( self.predict_sample(sample) )
            
        else:
            n_jobs = self.n_jobs if self.n_jobs > 0 else self.n_jobs + cpu_count() + 1
            if self.verbose:
                print(f'\n Predicting using {n_jobs} CPUs')
            
            with Pool(n_jobs) as p:
                predictions = p.map(self.predict_sample, X)
        
        return self.label_encoder_.inverse_transform(predictions)
    
    #
    # Helper methods
    #
    
    def handle_dissimilarity_arg(self, fit_kwargs):
        """
        Sets the dissimilarity matrix.
        
        Either compute it from Xy if dissimilariy="euclidean",
        or use dissimilarity_matrix supplied to
        .fit() when dissimilarity='precomputed'.
        
        """
        if self.dissimilarity == 'euclidean':
            self.dissimilarity_matrix_ = self.compute_euclidean_class_dissimilarity_matrix()
        elif self.dissimilarity == 'precomputed':
            if 'dissimilarity_matrix' not in fit_kwargs:
                raise TypeError(
                    ".fit() requires 'dissimilarity_matrix=' argument "
                    "when dissimilarity='precomputed'"
                )
            
            dissimilarity_matrix = fit_kwargs['dissimilarity_matrix']
                
            self.dissimilarity_matrix_ = self.sanitise_dissimilarity_matrix(
                dissimilarity_matrix
            )
            
    def compute_euclidean_class_dissimilarity_matrix(self):
        """
        Computes (n_classes, n_classes) dissimilarity (distance) matrix
        using the euclidean distance between pairs of classes in feature space.
        """
        class_centroids = np.stack(
            [self.X[self.y==klass].mean(axis=0) for klass in self.classes_],
            axis=0
        )
        
        dissimilarity_matrix = np.array([
            [np.linalg.norm(class_centroids[i] - class_centroids[j]) for j in self.classes_]
            for i in self.classes_
        ]).reshape(self.n_classes, self.n_classes)
        
        return self.sanitise_dissimilarity_matrix(dissimilarity_matrix)


    def sanitise_dissimilarity_matrix(self, mtx):
        """
        Checks if the supplied matrix is symmetrical and has a zero diagonal.
        If not, it forces those conditions on the matrix.
        """
        if np.array_equal(mtx, mtx.T) and np.abs(mtx).trace()==0.:
            return mtx
        
        if self.verbose:
            print(
                '\nDissimilarity matrix is either asymmetrical or has '
                'non-zero diagonal. Coercing matrix for symmetry and diagonal=0.'
            )
        
        #Enforce symmetry, diag=0
        np.fill_diagonal(
            mtx_clean := (mtx + mtx.T) / 2,
            val=0
        )
        return mtx_clean

    
    def create_linkage_matrix(self):
        """
        Uses scipy to create the linkage matrix Z, and a DataFrame version as well.
        """
        self.Z_ = hierarchy.linkage(
            squareform(self.dissimilarity_matrix_),
            method=self.linkage_method,
        )
        
        self.Z_df_ = pd.DataFrame({
            'class_i': self.Z_[:, 0].astype(int),
            'class_j': self.Z_[:, 1].astype(int),
            'linkage_distance': self.Z_[:, 2],
            'total_members': self.Z_[:, 3].astype(int)
        }).rename_axis(index='step')
    
    
    def plot_dendrogram(self):
        """
        Uses scipy to draw a dendrogram for each link point.
        """
        f, axs = plt.subplots(
            nrows=1, ncols=len(self.Z_df_), figsize=(11, 3),
            sharey=True, layout='constrained'
        )
        
        for ax, cut_pt in zip(axs, self.Z_df_.linkage_distance):
            dendrogram = hierarchy.dendrogram(
                self.Z_,
                leaf_font_size=10,
                labels=self.classes_,
                color_threshold=cut_pt,
                above_threshold_color='darkslategray',
                ax=ax
            )
            ax.axhline(cut_pt, linewidth=1.5, color='deeppink', linestyle=':')
            ax.spines[['top', 'right', 'bottom']].set_visible(False)
            ax.set_title(f'linkage step {np.argwhere(ax==axs).item()}')
        #Formatting
        ax = axs[0]
        ax.set_xlabel('class')
        ax.spines.left.set_visible(True)
        ax.set_ylabel(self.linkage_method + ' linkage distance')
    
    def render_tree_visualisation(self):
        """
        Uses graphviz package to display the linkage matrix as a tree
        """
        try:
            import graphviz
        except ModuleNotFoundError as e:
            print(
                'Error importing graphiz package. '
                'graphviz package is required for tree visualisation.\n', e
            )
            return
        
        try:
            dot = graphviz.Digraph('cluster_heirarchy')
            self.render_dot_tree_recursive(self.root_node_, dot)
            display(dot)
        except Exception as e:
            print('Error rendering graph visualisation using graphviz\n', e)
            
            
    def render_dot_tree_recursive(self, node, dot, parent=None):
        if node.is_leaf():
            dot.node(f"node{node.id}", f'class {node.id}')
        
        if parent:
            dot.edge(f"node{parent.id}", f"node{node.id}")
        if node.left:
            self.render_dot_tree_recursive(node.left, dot, node)
        if node.right:
            self.render_dot_tree_recursive(node.right, dot, node)
        
        
    #Build classifier tree
    def build_leaf_assignments_dict_recursive(self, node, res_dict, parent=None):
        """
        Recursively populates a dictionary that identifies each parent node with
        its class members.
        """
        if node.is_leaf():
            return
        
        # 0 2 4 1 3 relevant members, left-right
        # 0 1 0 1 1 assignments for [y0 y1 y2 y3 y4]
        #assignments[lr membs] = relevant assignments, l-r order
        leaves_lr = np.array(node.pre_order())
        assignments_lr = hierarchy.cut_tree(self.Z_, height=node.dist).ravel()[leaves_lr]
        
        #Convert left to 0, right  as 1
        assignments_lr = (assignments_lr - assignments_lr[0]).astype(bool).astype(int)
        
        res_dict[node.id] = [leaves_lr, assignments_lr]
        
        if self.verbose:
            if parent is None:
                print('\n', 'Building leaf assignments dict (0: left, 1:right)'.center(80, ' '))
            print(
                'classifier at parent node', node.id,
                '\n comprises classes       ', leaves_lr.tolist(),
                '\n with cluster assignments', assignments_lr.tolist(),
            )
        
        self.build_leaf_assignments_dict_recursive(node.left, res_dict, node)
        self.build_leaf_assignments_dict_recursive(node.right, res_dict, node)
    
    
    def fit_classifier_tree_recursive(self, node, clf_dict, parent=None):
        """
        Fit a classifier at each parent node, and populate a dict with the
        classifiers.
        """
        if node.is_leaf():
            return
        
        #leaves relevant to this node, and their assignments
        leaves, assignments = self.leaf_assignments_dict[node.id]
        
        #discard samples with irrelevant leaves
        subset_ixs_bool = np.isin(self.y, leaves)
        X_subset, y_subset = [arr[subset_ixs_bool] for arr in [self.X, self.y]]
        
        #group of left classes, and right classes
        # use to binarise y as 0:left group, 1:right group
        left_group, right_group = [leaves[assignments==i] for i in [0, 1]]
        y_bin = np.where(np.isin(y_subset, left_group), 0, 1)
        
        #Fit a classifer for this node
        clf = clone(self.estimator)

        if self.verbose:
            if not parent:
                print('\n', 'Fitting classifiers'.center(80, ' '))
                
            nsamples_left, nsamples_right = [sum(y_bin==i) for i in [0, 1]]
            imbalance_ratio = (
                max(nsamples_left, nsamples_right) / min(nsamples_left, nsamples_right)
            )
            print(
                f'Fitting node{node.id} classifier using {y_bin.size} samples',
                f'({y_bin.size / self.y.size:.0%} of the data)',
                
                f'\n left group:  {left_group.tolist()} ({nsamples_left} samples)',
                f'\n right group: {right_group.tolist()} ({nsamples_right} samples)',
                f'\n imbalance (≥1) ratio: {imbalance_ratio:>4.2f}',
            )

        clf.fit(X_subset, y_bin)
        clf_dict[node.id] = clf
        
        #recurse into child nodes
        self.fit_classifier_tree_recursive(node.left, clf_dict, node)
        self.fit_classifier_tree_recursive(node.right, clf_dict, node)
    
    
    def predict_sample(self, sample):
        """
        Prediction for a single sample.
        """
        if self.verbose == 2:
            print(
                '\n', 'Predicting for sample'.center(80, ' '),
                '\nX_i: ', str(sample)[:70] + '...',
            )
            
        if sample.ndim == 1:
            sample = sample.reshape(1, -1)
        
        prediction = [None] #recursion will deposit value in prediction[0]
        self.predict_sample_recursive(sample, prediction, self.root_node_)
        return prediction[0]
        
    def predict_sample_recursive(self, sample, return_value_placeholder, node, parent=None):
        if sample.ndim == 1:
            sample = sample.reshape(1, -1)
        
        if self.verbose == 2:        
            if node.is_leaf():
                print(
                    f' final prediction is encoded class={node.id}',
                    f'[y={self.label_encoder_.inverse_transform([node.id]).item()}]'
                )

        #Reached leaf node/final prediction
        if node.is_leaf():
            return_value_placeholder[0] = node.id
            return
        
        #Get left/right prediction at this node
        prediction = self.classifiers_dict_[node.id].predict(sample).item()
        if self.verbose == 2:
            print(f' node{node.id}', 'is branching', ['left (0)', 'right (1)'][prediction])
        
        #Recurse into the left/right branch depending on "prediction"
        self.predict_sample_recursive(
            sample,
            return_value_placeholder,
            [node.left, node.right][prediction],
            parent=node
        )
like image 107
MuhammedYunus SaveGaza Avatar answered Oct 29 '25 02:10

MuhammedYunus SaveGaza



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!