Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Geographical distance by group - Applying a function on each pair of rows

I want to calculate the average geographical distance between a number of houses per province.

Suppose I have the following data.

df1 <- data.frame(province = c(1, 1, 1, 2, 2, 2),
              house = c(1, 2, 3, 4, 5, 6),
              lat = c(-76.6, -76.5, -76.4, -75.4, -80.9, -85.7), 
              lon = c(39.2, 39.1, 39.3, 60.8, 53.3, 40.2))

Using the geosphere library I can find the distance between two houses. For instance:

library(geosphere)
distm(c(df1$lon[1], df1$lat[1]), c(df1$lon[2], df1$lat[2]), fun = distHaversine)

#11429.1

How do I calculate the distance between all the houses in the province and gather the mean distance per province?

The original data-set has millions of observations per province, so performance is an issue here, too.

like image 505
wake_wake Avatar asked Apr 10 '19 21:04

wake_wake


People also ask

Which r function can be used to compute the distance between two observations?

dist() function To do this in R, we use the dist function to calculate the euclidean distance between our observations. The function simply requires a data frame or matrix containing your observations and features.

Which r function can be used to compute the distance between two observations dir dist diff dim?

R provides an inbuilt dist() function using which we can calculate six different kinds of distances between each unique pair of vectors in a two-dimensional vector. dist() method accepts a numeric matrix as an argument and a method that represent the type of distance to be measured.


1 Answers

My initial idea was to look at the source code of distHaversine and replicate it in a function that I would use with proxy. That would work like this (note that lon is expected to be the first column):

library(geosphere)
library(dplyr)
library(proxy)

df1 <- data.frame(province = as.integer(c(1, 1, 1, 2, 2, 2)),
                  house = as.integer(c(1, 2, 3, 4, 5, 6)),
                  lat = c(-76.6, -76.5, -76.4, -75.4, -80.9, -85.7), 
                  lon = c(39.2, 39.1, 39.3, 60.8, 53.3, 40.2))

custom_haversine <- function(x, y) {
  toRad <- pi / 180

  diff <- (y - x) * toRad
  dLon <- diff[1L]
  dLat <- diff[2L]

  a <- sin(dLat / 2) ^ 2 + cos(x[2L] * toRad) * cos(y[2L] * toRad) * sin(dLon / 2) ^ 2
  a <- min(a, 1)
  # return
  2 * atan2(sqrt(a), sqrt(1 - a)) * 6378137
}

pr_DB$set_entry(FUN=custom_haversine, names="haversine", loop=TRUE, distance=TRUE)

average_dist <- df1 %>%
  select(-house) %>%
  group_by(province) %>%
  group_map(~ data.frame(avg=mean(proxy::dist(.x[ , c("lon", "lat")], method="haversine"))))

However, if you're expecting millions of rows per province, proxy probably won't be able to allocate the intermediate (lower triangular of the) matrices. So I ported the code to C++ and added multi-threading as a bonus:

EDIT: turns out the s2d helper was far from optimal, this version now uses the formulas given here.

EDIT2: I just found out about RcppThread, and it can be used to detect user interrupt.

// [[Rcpp::plugins(cpp11)]]
// [[Rcpp::depends(RcppParallel,RcppThread)]]

#include <cstddef> // size_t
#include <math.h> // sin, cos, sqrt, atan2, pow
#include <vector>

#include <RcppThread.h>
#include <Rcpp.h>
#include <RcppParallel.h>

using namespace std;
using namespace Rcpp;
using namespace RcppParallel;

// single to double indices for lower triangular of matrices without diagonal
void s2d(const size_t id, const size_t nrow, size_t& i, size_t& j) {
  j = nrow - 2 - static_cast<size_t>(sqrt(-8 * id + 4 * nrow * (nrow - 1) - 7) / 2 - 0.5);
  i = id + j + 1 - nrow * (nrow - 1) / 2 + (nrow - j) * ((nrow - j) - 1) / 2;
}

class HaversineCalculator : public Worker
{
public:
  HaversineCalculator(const NumericVector& lon,
                      const NumericVector& lat,
                      double& avg,
                      const int n)
    : lon_(lon)
    , lat_(lat)
    , avg_(avg)
    , n_(n)
    , cos_lat_(lon.length())
  {
    // terms for distance calculation
    for (size_t i = 0; i < cos_lat_.size(); i++) {
      cos_lat_[i] = cos(lat_[i] * 3.1415926535897 / 180);
    }
  }

  void operator()(size_t begin, size_t end) {
    // for Kahan summation
    double sum = 0;
    double c = 0;

    double to_rad = 3.1415926535897 / 180;

    size_t i, j;
    for (size_t ind = begin; ind < end; ind++) {
      if (RcppThread::isInterrupted(ind % static_cast<int>(1e5) == 0)) return;

      s2d(ind, lon_.length(), i, j);

      // haversine distance
      double d_lon = (lon_[j] - lon_[i]) * to_rad;
      double d_lat = (lat_[j] - lat_[i]) * to_rad;
      double d_hav = pow(sin(d_lat / 2), 2) + cos_lat_[i] * cos_lat_[j] * pow(sin(d_lon / 2), 2);
      if (d_hav > 1) d_hav = 1;
      d_hav = 2 * atan2(sqrt(d_hav), sqrt(1 - d_hav)) * 6378137;

      // the average part
      d_hav /= n_;

      // Kahan sum step
      double y = d_hav - c;
      double t = sum + y;
      c = (t - sum) - y;
      sum = t;
    }

    mutex_.lock();
    avg_ += sum;
    mutex_.unlock();
  }

private:
  const RVector<double> lon_;
  const RVector<double> lat_;
  double& avg_;
  const int n_;
  tthread::mutex mutex_;
  vector<double> cos_lat_;
};

// [[Rcpp::export]]
double avg_haversine(const DataFrame& input, const int nthreads) {
  NumericVector lon = input["lon"];
  NumericVector lat = input["lat"];

  double avg = 0;
  int size = lon.length() * (lon.length() - 1) / 2;
  HaversineCalculator hc(lon, lat, avg, size);

  int grain = size / nthreads / 10;
  RcppParallel::parallelFor(0, size, hc, grain);
  RcppThread::checkUserInterrupt();

  return avg;
}

This code won't allocate any intermediate matrix, it will simply calculate the distance for each pair of what would be the lower triangular and accumulate the values for an average in the end. See here for the Kahan summation part.

If you save that code in, say, haversine.cpp, then you can do the following:

library(dplyr)
library(Rcpp)
library(RcppParallel)
library(RcppThread)

sourceCpp("haversine.cpp")

df1 %>%
  group_by(province) %>%
  group_map(~ data.frame(avg=avg_haversine(.x, parallel::detectCores())))
# A tibble: 2 x 2
# Groups:   province [2]
  province     avg
     <int>   <dbl>
1        1  15379.
2        2 793612.

Here's a sanity check too:

pr_DB$set_entry(FUN=geosphere::distHaversine, names="distHaversine", loop=TRUE, distance=TRUE)

df1 %>%
  select(-house) %>%
  group_by(province) %>%
  group_map(~ data.frame(avg=mean(proxy::dist(.x[ , c("lon", "lat")], method="distHaversine"))))

A word of caution though:

df <- data.frame(lon=runif(1e3, -90, 90), lat=runif(1e3, -90, 90))

system.time(proxy::dist(df, method="distHaversine"))
   user  system elapsed 
 34.353   0.005  34.394

system.time(proxy::dist(df, method="haversine"))
   user  system elapsed 
  0.789   0.020   0.809

system.time(avg_haversine(df, 4L))
   user  system elapsed 
  0.054   0.000   0.014

df <- data.frame(lon=runif(1e5, -90, 90), lat=runif(1e5, -90, 90))

system.time(avg_haversine(df, 4L))
   user  system elapsed 
 73.861   0.238  19.670

You'll probably have to wait quite a while if you have millions of rows...

I should also mention that it's not possible to detect user interrupt inside the threads created through RcppParallel, so if you start the calculation you should either wait until it finishes, or restart R/RStudio entirely. See EDIT2 above.


Regarding complexity

Depending on your actual data and how many cores your computer has, you may very well end up waiting days for the calculation to finish. This problem has quadratic complexity (per province, so to speak). This line:

int size = lon.length() * (lon.length() - 1) / 2;

signifies the amount of (haversine) distance calculations that must be performed. So if the number of rows increases by a factor of n, the number of calculations increases by a factor of n^2 / 2, roughly speaking.

There is no way to optimize this; you can't calculate the average of N numbers without actually computing each number first, and you'll have a hard time finding something faster than multi-threaded C++ code, so you'll either have to wait it out, or throw more cores at the problem, either with a single machine or with many machines working together. Otherwise you can't solve this problem.

like image 155
Alexis Avatar answered Nov 10 '22 13:11

Alexis