It's easy and simple in the toy examples for showing how to program in spark. You just import, create, use and discard, all in one little function.
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
def main(args: String) {
val conf = new SparkConf().setAppName("example")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val hiveContext = new HiveContext(sc)
import hiveContext.implicits._
import hiveContext.sql
// load data from hdfs
val df1 = sqlContext.textFile("hdfs://.../myfile.csv").map(...)
val df1B = sc.broadcast(df1)
// load data from hive
val df2 = sql("select * from mytable")
// transform df2 with df1B
val cleanCol = udf(cleanMyCol(df1B)).apply("myCol")
val df2_new = df2.withColumn("myCol", cleanCol)
...
sc.stop()
}
In the real world, I find myself writing quite a few functions to modularize the tasks. For example, I would have a few functions just to load the different data tables. And in these load functions I would call other functions to do necessary data cleaning/transformation as I load the data. Then I would pass the contexts like so:
def loadHdfsFileAndBroadcast(sc: SparkContext) = {
// use sc here
val df = sc.textFile("hdfs://.../myfile.csv").map(...)
val dfB = sc.broadcast(df)
dfB
}
def loadHiveTable(hiveContext: HiveContext, df1B: Broadcast[Map[String, String]]) = {
import hiveContext.implicits._
val data = hiveContext.sql("select * from myHiveTable")
// data cleaning
val cleanCol = udf(cleanMyCol(df1B)).apply(col("myCol"))
df_cleaned = data.withColumn("myCol", cleanCol)
df_cleaned
}
As you can see, the load function signatures get heavy quite easily.
I've tried to put these context imports outside the main function inside the class. But that causes problems (see this issue), which leaves me no option but to pass them around.
Is this the way to go or is there a better way to do this?
If you have a necessity to work with lots of Spark contexts, you can turn on special option [MultipleContexts] (1) , but it is used only for Spark internal tests and is not supposed to be used in user programs. You will get unexpected behavior while running more than one Spark context in a single JVM [SPARK-2243] (2).
Note: we can have multiple spark contexts by setting spark. driver. allowMultipleContexts to true . But having multiple spark contexts in the same jvm is not encouraged and is not considered as a good practice as it makes it more unstable and crashing of 1 spark context can affect the other.
An accumulator is created from an initial value v by calling SparkContext. accumulator(v). Tasks running on the cluster can then add to it using the add method or the += operator (in Scala and Python). However, they cannot read its value.
parallelize() method is the SparkContext's parallelize method to create a parallelized collection. This allows Spark to distribute the data across multiple nodes, instead of depending on a single node to process the data: Now that we have created ... Get PySpark Cookbook now with the O'Reilly learning platform.
What is SparkContext? Explained - Spark by {Examples} What is SparkContext? Explained SparkContext is available since Spark 1.x (JavaSparkContext for Java) and it used to be an entry point to Spark and PySpark before introducing SparkSession in 2.0.
Spark 1.X – Creating SparkContext using Scala Program In Spark 1.x, first, you need to create a SparkConf instance by assigning app name and setting master by using the SparkConf static methods setAppName () and setMaster () respectively and then pass SparkConf object as an argument to SparkContext constructor to create Spark Context.
Below is a PySpark example to create SparkSession. When running it on the cluster you need to use your master name as an argument to master (). usually, it would be either yarn or mesos depends on your cluster setup. A Spark “driver” is an application that creates a SparkContext for executing one or more jobs in the Spark cluster.
You can see some example Spark programs on the Spark website. In addition, Spark includes several samples in the examples directory ( Scala , Java , Python ). You can run Java and Scala examples by passing the class name to Spark’s bin/run-example script; for instance:
First, let me say I'm glad that someone is exploring writing clean code in Spark. That is something I always find critical, but it always seems like people are so focused on the analytics themselves they lose sight of maintainability.
I do also agree Spark produces interesting challenges in that regard. The best way I've found, and of course you might feel this isn't an improvement, is to use traits with abstract method definitions and mix those into the object that orchestrates everything.
For example:
trait UsingSparkContextTrait {
def sc: SparkContext
def loadHdfsFileAndBroadcast = {
val df = sc.textFile("hdfs://.../myfile.csv").map(...)
sc.broadcast(df)
}
}
trait UsingHiveContextTrait {
def hiveContext: HiveContext
def df1B: Broadcast[Map[String, String]]
def loadHiveTable = {
val data = hiveContext.sql("select * from myHiveTable")
val cleanCol = udf(cleanMyCol(df1B)).apply(col("myCol"))
data.withColumn("myCol", cleanCol)
}
}
And then finally:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext
class ClassDoingWork extends UsingSparkContextTrait with UsingHiveContextTrait {
val conf = new SparkConf().setAppName("example")
val sc = new SparkContext(conf) //Satisfies UsingSparkContextTrait
val sqlContext = new SQLContext(sc)
val hiveContext = new HiveContext(sc) //Satisfies UsingHiveContextTrait
val dfb = loadHdfsFileAndBroadcast //Satisfies UsingHiveContextTrait
import hiveContext.implicits._
import hiveContext.sql
def doAnalytics = {
val dfCleaned = loadHiveTable
...
}
}
The cool thing about this dependency injection-ish approach is that you will know at compile-time if you are missing any of the components you need for your code to execute.
Finally, on a much simpler note, you can also access the SparkContext
from an RDD
instance with rdd.context
. That could prove useful too.
If all of your methods are defined in a single object/class, you could make the contexts belong to the object/class and always reference the global instance. If you provide it in the constructor you can even safely import only once and have access to the methods everywhere in your class/object.
For instance, if contexts are defined implicitly in calling object
object testObject {
def main(args: Array[String]): Unit = {
val sconf = new SparkConf().setMaster("local[2]").setAppName("testObj")
val rootLogger = Logger.getRootLogger
rootLogger.setLevel(Level.ERROR)
implicit val sc = new SparkContext(sconf)
implicit val sqlContext = new SQLContext(sc)
new foo().run()
}
}
you can use them below in the class that actually holds your logic
case class OneVal(value: String)
class foo(implicit val sc: SparkContext, implicit val sqlC: SQLContext){
import sqlC.implicits._
def run(): Unit ={
doStuff().show(1)
doOtherStuff().show(1)
}
def doStuff(): DataFrame ={
sc.parallelize(List(OneVal("test"))).toDF()
}
def doOtherStuff(): DataFrame ={
sc.parallelize(List(OneVal("differentTest"))).toDF()
}
}
In this example SQLContext.toDF is the implicit method in this case.
If run, this gives below output as expected
+-----+
|value|
+-----+
| test|
+-----+
+-------------+
| value|
+-------------+
|differentTest|
+-------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With