Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is it possible to combine multiple map and reduce functions into a single pass in Scala?

I have multiple map functions running over the same data and I'd like to have them run in a single pass. I'm looking for a generic way to do this.

val fruits: Seq[String] = Seq("apple", "banana", "cherry")

def mapF(s: String): Char = s.head
def reduceF(c1: Char, c2: Char): Char = if(c1 > c2) c1 else c2

def mapG(s: String): Int = s.length
def reduceG(i1: Int, i2: Int): Int = i1 + i2

val largestStartingChar = fruits.map(mapF).reduce(reduceF)
val totalStringLength = fruits.map(mapG).reduce(reduceG)

I'd like to reduce the number of passes over fruits. I can make this generic for two maps and reduces like this:

def productMapFunction[A, B, C](f: A=>B, g: A=>C): A => (B, C) = {
  x => (f(x), g(x))
}

def productReduceFunction[T, U](f: (T, T)=>T, g: (U, U) => U):
    ((T,U), (T,U)) => (T, U) = {
  (tu1, tu2) => (f(tu1._1, tu2._1), g(tu1._2, tu2._2))
}

val xMapFG = productMapFunction(mapF, mapG)
val xReduceFG = productReduceFunction(reduceF, reduceG)

val (largestStartingChar2, totalStringLength2) = 
  fruits.map(xMapFG).reduce(xReduceFG))

I'd like to do this even more generically, with an arbitrary number of map and reduce functions, but I'm not sure how to proceed, or if this is possible.

like image 258
Michael K Avatar asked Oct 16 '22 04:10

Michael K


1 Answers

Following solution uses Cats 2 and custom type MapReduce.

Reducing operation can be specified by function reduce: (O, O) => O or cats reducer: Semigroup[O]. Multiple MapReduce objects can be combined into one by Apply instance provided by implicit def mapReduceApply[I]

import cats._
import cats.implicits._

trait MapReduce[I, O] {
  type R

  def reducer: Semigroup[R]

  def map: I => R

  def mapResult: R => O

  def apply(input: Seq[I]): O = mapResult(input.map(map).reduce(reducer.combine))
}

object MapReduce {
  def apply[I, O, _R](_reducer: Semigroup[_R], _map: I => _R, _mapResult: _R => O): MapReduce[I, O] =
    new MapReduce[I, O] {
      override type R = _R

      override def reducer = _reducer

      override def map = _map

      override def mapResult = _mapResult
    }

  def apply[I, O](map: I => O)(implicit r: Semigroup[O]): MapReduce[I, O] =
    MapReduce[I, O, O](r, map, identity)

  def apply[I, O](map: I => O, reduce: (O, O) => O): MapReduce[I, O] = {
    val reducer = new Semigroup[O] {
      override def combine(x: O, y: O): O = reduce(x, y)
    }
    MapReduce(map)(reducer)
  }

  implicit def mapReduceApply[I] =
    new Apply[({type F[X] = MapReduce[I, X]})#F] {
      override def map[A, B](f: MapReduce[I, A])(fn: A => B): MapReduce[I, B] =
        MapReduce(f.reducer, f.map, f.mapResult.andThen(fn))

      override def ap[A, B](ff: MapReduce[I, (A) => B])(fa: MapReduce[I, A]): MapReduce[I, B] =
        MapReduce(ff.reducer product fa.reducer,
          i => (ff.map(i), fa.map(i)),
          (t: (ff.R, fa.R)) => ff.mapResult(t._1)(fa.mapResult(t._2))
        )
    }

}

object MultiMapReduce extends App {

  val fruits: Seq[String] = Seq("apple", "banana", "cherry")

  def mapF(s: String): Char = s.head

  def reduceF(c1: Char, c2: Char): Char = if (c1 > c2) c1 else c2

  val biggestFirsChar = MapReduce(mapF, reduceF)
  val totalChars = MapReduce[String, Int](_.length) // (Semigroup[Int]) reduce by _ + _
  def count[A] = MapReduce[A, Int](_ => 1)

  val multiMapReduce = (biggestFirsChar, totalChars, count[String]).mapN((_, _, _))
  println(multiMapReduce(fruits))

  val sum = MapReduce[Double, Double](identity)
  val average = (sum, count[Double]).mapN(_ / _)
  println(sum(List(1, 2, 3, 4)))
  println(average(List(1, 2, 3, 4)))

}

Runnable version is also available on GitHub.

like image 188
Yuriy Tumakha Avatar answered Oct 19 '22 01:10

Yuriy Tumakha