Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Dynamic Time warping with kNN in python

I have a time-series dataset with two lables (0 and 1). I am using Dynamic Time Warping (DTW) as a similarity measure for classification using k-nearest neighbour (kNN) as described in these two wonderful blog posts:

  • https://nbviewer.jupyter.org/github/markdregan/K-Nearest-Neighbors-with-Dynamic-Time-Warping/blob/master/K_Nearest_Neighbor_Dynamic_Time_Warping.ipynb
  • http://alexminnaar.com/2014/04/16/Time-Series-Classification-and-Clustering-with-Python.html

    Arguments
    ---------
    n_neighbors : int, optional (default = 5)
        Number of neighbors to use by default for KNN
    
    max_warping_window : int, optional (default = infinity)
        Maximum warping window allowed by the DTW dynamic
        programming function
    
    subsample_step : int, optional (default = 1)
        Step size for the timeseries array. By setting subsample_step = 2,
        the timeseries length will be reduced by 50% because every second
        item is skipped. Implemented by x[:, ::subsample_step]
    """
    
    def __init__(self, n_neighbors=5, max_warping_window=10000, subsample_step=1):
        self.n_neighbors = n_neighbors
        self.max_warping_window = max_warping_window
        self.subsample_step = subsample_step
    
    def fit(self, x, l):
        """Fit the model using x as training data and l as class labels
    
        Arguments
        ---------
        x : array of shape [n_samples, n_timepoints]
            Training data set for input into KNN classifer
    
        l : array of shape [n_samples]
            Training labels for input into KNN classifier
        """
    
        self.x = x
        self.l = l
    
    def _dtw_distance(self, ts_a, ts_b, d = lambda x,y: abs(x-y)):
        """Returns the DTW similarity distance between two 2-D
        timeseries numpy arrays.
    
        Arguments
        ---------
        ts_a, ts_b : array of shape [n_samples, n_timepoints]
            Two arrays containing n_samples of timeseries data
            whose DTW distance between each sample of A and B
            will be compared
    
        d : DistanceMetric object (default = abs(x-y))
            the distance measure used for A_i - B_j in the
            DTW dynamic programming function
    
        Returns
        -------
        DTW distance between A and B
        """
    
        # Create cost matrix via broadcasting with large int
        ts_a, ts_b = np.array(ts_a), np.array(ts_b)
        M, N = len(ts_a), len(ts_b)
        cost = sys.maxint * np.ones((M, N))
    
        # Initialize the first row and column
        cost[0, 0] = d(ts_a[0], ts_b[0])
        for i in xrange(1, M):
            cost[i, 0] = cost[i-1, 0] + d(ts_a[i], ts_b[0])
    
        for j in xrange(1, N):
            cost[0, j] = cost[0, j-1] + d(ts_a[0], ts_b[j])
    
        # Populate rest of cost matrix within window
        for i in xrange(1, M):
            for j in xrange(max(1, i - self.max_warping_window),
                            min(N, i + self.max_warping_window)):
                choices = cost[i - 1, j - 1], cost[i, j-1], cost[i-1, j]
                cost[i, j] = min(choices) + d(ts_a[i], ts_b[j])
    
        # Return DTW distance given window 
        return cost[-1, -1]
    
    def _dist_matrix(self, x, y):
        """Computes the M x N distance matrix between the training
        dataset and testing dataset (y) using the DTW distance measure
    
        Arguments
        ---------
        x : array of shape [n_samples, n_timepoints]
    
        y : array of shape [n_samples, n_timepoints]
    
        Returns
        -------
        Distance matrix between each item of x and y with
            shape [training_n_samples, testing_n_samples]
        """
    
        # Compute the distance matrix        
        dm_count = 0
    
        # Compute condensed distance matrix (upper triangle) of pairwise dtw distances
        # when x and y are the same array
        if(np.array_equal(x, y)):
            x_s = np.shape(x)
            dm = np.zeros((x_s[0] * (x_s[0] - 1)) // 2, dtype=np.double)
    
            p = ProgressBar(shape(dm)[0])
    
            for i in xrange(0, x_s[0] - 1):
                for j in xrange(i + 1, x_s[0]):
                    dm[dm_count] = self._dtw_distance(x[i, ::self.subsample_step],
                                                      y[j, ::self.subsample_step])
    
                    dm_count += 1
                    p.animate(dm_count)
    
            # Convert to squareform
            dm = squareform(dm)
            return dm
    
        # Compute full distance matrix of dtw distnces between x and y
        else:
            x_s = np.shape(x)
            y_s = np.shape(y)
            dm = np.zeros((x_s[0], y_s[0])) 
            dm_size = x_s[0]*y_s[0]
    
            p = ProgressBar(dm_size)
    
            for i in xrange(0, x_s[0]):
                for j in xrange(0, y_s[0]):
                    dm[i, j] = self._dtw_distance(x[i, ::self.subsample_step],
                                                  y[j, ::self.subsample_step])
                    # Update progress bar
                    dm_count += 1
                    p.animate(dm_count)
    
            return dm
    
    def predict(self, x):
        """Predict the class labels or probability estimates for 
        the provided data
    
        Arguments
        ---------
          x : array of shape [n_samples, n_timepoints]
              Array containing the testing data set to be classified
    
        Returns
        -------
          2 arrays representing:
              (1) the predicted class labels 
              (2) the knn label count probability
        """
    
        dm = self._dist_matrix(x, self.x)
    
        # Identify the k nearest neighbors
        knn_idx = dm.argsort()[:, :self.n_neighbors]
    
        # Identify k nearest labels
        knn_labels = self.l[knn_idx]
    
        # Model Label
        mode_data = mode(knn_labels, axis=1)
        mode_label = mode_data[0]
        mode_proba = mode_data[1]/self.n_neighbors
    
        return mode_label.ravel(), mode_proba.ravel()
    

However, for classification with kNN the two posts use their own kNN algorithms.

I want to use sklearn's options such as gridsearchcv in my classification. Therefore, I would like to know how I can use Dynamic Time Warping (DTW) with sklearn kNN.

Note: I am not limited to sklearn and happy to receive answers in other libraries as well

I am happy to provide more details if needed.

like image 801
EmJ Avatar asked Jul 13 '19 01:07

EmJ


People also ask

What is dynamic time warping Python?

Dynamic Time Warping is used to compare the similarity or calculate the distance between two arrays or time series with different length.

What is dynamic time warping algorithm?

In time series analysis, dynamic time warping (DTW) is an algorithm for measuring similarity between two temporal sequences, which may vary in speed.

How can I improve my KNN model accuracy?

The key to improve the algorithm is to add a preprocessing stage to make the final algorithm run with more efficient data and then improve the effect of classification. The experimental results show that the improved KNN algorithm improves the accuracy and efficiency of classification.


1 Answers

You can use a custom metric for KNN. Therefore you only need to implement DTW yourself (or use/adapt any existing DTW implementation in python) [gist of this code].

import numpy as np
from scipy.spatial import distance
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report

#toy dataset 
X = np.random.random((100,10))
y = np.random.randint(0,2, (100))
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)

#custom metric
def DTW(a, b):   
    an = a.size
    bn = b.size
    pointwise_distance = distance.cdist(a.reshape(-1,1),b.reshape(-1,1))
    cumdist = np.matrix(np.ones((an+1,bn+1)) * np.inf)
    cumdist[0,0] = 0

    for ai in range(an):
        for bi in range(bn):
            minimum_cost = np.min([cumdist[ai, bi+1],
                                   cumdist[ai+1, bi],
                                   cumdist[ai, bi]])
            cumdist[ai+1, bi+1] = pointwise_distance[ai,bi] + minimum_cost

    return cumdist[an, bn]

#train
parameters = {'n_neighbors':[2, 4, 8]}
clf = GridSearchCV(KNeighborsClassifier(metric=DTW), parameters, cv=3, verbose=1)
clf.fit(X_train, y_train)



#evaluate
y_pred = clf.predict(X_test)
print(classification_report(y_test, y_pred))

Which yields

Fitting 3 folds for each of 3 candidates, totalling 9 fits        

[Parallel(n_jobs=1)]: Done   9 out of   9 | elapsed:   29.0s finished

                         precision    recall  f1-score   support

                      0       0.57      0.89      0.70        18
                      1       0.60      0.20      0.30        15

            avg / total       0.58      0.58      0.52        33
like image 138
Nikolas Rieble Avatar answered Sep 23 '22 13:09

Nikolas Rieble