Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to perform initialization in spark?

I want to perform geoip lookups of my data in spark. To do that I'm using MaxMind's geoIP database.

What I want to do is to initialize a geoip database object once on each partition, and later use that to lookup the city related to an IP address.

Does spark have an initialization phase for each node, or should I instead check whether an instance variable is undefined, and if so, initialize it before continuing? E.g. something like (this is python but I want a scala solution):

class IPLookup(object):
    database = None

    def getCity(self, ip):
      if not database:
        self.database = self.initialise(geoipPath)
  ...

Of course, doing this requires spark will serialise the whole object, something which the docs caution against.

like image 683
jbrown Avatar asked Nov 21 '14 17:11

jbrown


People also ask

How do you initialize a Spark?

Initializing Spark To create a SparkContext you first need to build a SparkConf object that contains information about your application. Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one.

What is the command to initialize Spark using Scala in terminal?

Go to the Apache Spark Installation directory from the command line and type bin/spark-shell and press enter, this launches Spark shell and gives you a scala prompt to interact with Spark in scala language. If you have set the Spark in a PATH then just enter spark-shell in command line or terminal (mac users).

What does First () do in Spark?

In Spark, the First function always returns the first element of the dataset. It is similar to take(1).


2 Answers

In Spark, per partition operations can be do using :

def mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false)

This mapper will execute the function f once per partition over an iterator of elements. The idea is that the cost of setting up resources (like DB connections) will be offset with the usage of such resources over a number of elements in the iterator.

Example:

val logsRDD = ???
logsRDD.mapPartitions{iter =>
   val geoIp = new GeoIPLookupDB(...)
   // this is local map over the iterator - do not confuse with rdd.map
   iter.map(elem => (geoIp.resolve(elem.ip),elem)) 
}
like image 151
maasg Avatar answered Nov 02 '22 12:11

maasg


This seems like a good usage of a broadcast variable. Have you looked at the documentation for that functionality and if you have does it fail to meet your requirements in someway?

like image 29
bearrito Avatar answered Nov 02 '22 12:11

bearrito