To ease the development of my map reduce tasks running on Hadoop prior to actually deploying the tasks to Hadoop I test using a simple map reducer I wrote :
object mapreduce {
import scala.collection.JavaConversions._
val intermediate = new java.util.HashMap[String, java.util.List[Int]]
//> intermediate : java.util.HashMap[String,java.util.List[Int]] = {}
val result = new java.util.ArrayList[Int] //> result : java.util.ArrayList[Int] = []
def emitIntermediate(key: String, value: Int) {
if (!intermediate.containsKey(key)) {
intermediate.put(key, new java.util.ArrayList)
}
intermediate.get(key).add(value)
} //> emitIntermediate: (key: String, value: Int)Unit
def emit(value: Int) {
println("value is " + value)
result.add(value)
} //> emit: (value: Int)Unit
def execute(data: java.util.List[String], mapper: String => Unit, reducer: (String, java.util.List[Int]) => Unit) {
for (line <- data) {
mapper(line)
}
for (keyVal <- intermediate) {
reducer(keyVal._1, intermediate.get(keyVal._1))
}
for (item <- result) {
println(item)
}
} //> execute: (data: java.util.List[String], mapper: String => Unit, reducer: (St
//| ring, java.util.List[Int]) => Unit)Unit
def mapper(record: String) {
var jsonAttributes = com.nebhale.jsonpath.JsonPath.read("$", record, classOf[java.util.ArrayList[String]])
println("jsonAttributes are " + jsonAttributes)
var key = jsonAttributes.get(0)
var value = jsonAttributes.get(1)
println("key is " + key)
var delims = "[ ]+";
var words = value.split(delims);
for (w <- words) {
emitIntermediate(w, 1)
}
} //> mapper: (record: String)Unit
def reducer(key: String, listOfValues: java.util.List[Int]) = {
var total = 0
for (value <- listOfValues) {
total += value;
}
emit(total)
} //> reducer: (key: String, listOfValues: java.util.List[Int])Unit
var dataToProcess = new java.util.ArrayList[String]
//> dataToProcess : java.util.ArrayList[String] = []
dataToProcess.add("[\"test1\" , \"test1 here is another test1 test1 \"]")
//> res0: Boolean = true
dataToProcess.add("[\"test2\" , \"test2 here is another test2 test1 \"]")
//> res1: Boolean = true
execute(dataToProcess, mapper, reducer) //> jsonAttributes are [test1, test1 here is another test1 test1 ]
//| key is test1
//| jsonAttributes are [test2, test2 here is another test2 test1 ]
//| key is test2
//| value is 2
//| value is 2
//| value is 4
//| value is 2
//| value is 2
//| 2
//| 2
//| 4
//| 2
//| 2
for (keyValue <- intermediate) {
println(keyValue._1 + "->"+keyValue._2.size)//> another->2
//| is->2
//| test1->4
//| here->2
//| test2->2
}
}
This allows me to run my mapreduce tasks within my Eclipse IDE on Windows before deploying to the actual Hadoop cluster. I would like to perform something similar for Spark or have the ability to write Spark code from within Eclipse to test prior to deploying to Spark cluster. Is this possible with Spark ? Since Spark runs on top of Hadoop does this mean I cannot run Spark without first having Hadoop installed ? So in other words can I run the code using just the Spark libraries ? :
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object SimpleApp {
def main(args: Array[String]) {
val logFile = "$YOUR_SPARK_HOME/README.md" // Should be some file on your system
val sc = new SparkContext("local", "Simple App", "YOUR_SPARK_HOME",
List("target/scala-2.10/simple-project_2.10-1.0.jar"))
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}
taken from https://spark.apache.org/docs/0.9.0/quick-start.html#a-standalone-app-in-scala
If so what are the Spark libraries I need to include within my project ?
Use IntelliJ to create applicationStart IntelliJ IDEA, and select Create New Project to open the New Project window. Select Apache Spark/HDInsight from the left pane. Select Spark Project (Scala) from the main window.
Add the following to your build.sbt
libraryDependencies += "org.apache.spark" %% "spark-core" % "0.9.1"
and make sure your scalaVersion
is set (eg. scalaVersion := "2.10.3"
)
Also if you're just running the program locally, you can skip the last two arguments to SparkContext as follows val sc = new SparkContext("local", "Simple App")
Finally, Spark can run on Hadoop but can also run in stand alone mode. See: https://spark.apache.org/docs/0.9.1/spark-standalone.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With