Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark programming: best way to organize context imports and others with multiple functions

It's easy and simple in the toy examples for showing how to program in spark. You just import, create, use and discard, all in one little function.

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext

def main(args: String) {
  val conf = new SparkConf().setAppName("example")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)

  val hiveContext = new HiveContext(sc)
  import hiveContext.implicits._
  import hiveContext.sql

  // load data from hdfs
  val df1 = sqlContext.textFile("hdfs://.../myfile.csv").map(...)
  val df1B = sc.broadcast(df1)

  // load data from hive
  val df2 = sql("select * from mytable")
  // transform df2 with df1B
  val cleanCol = udf(cleanMyCol(df1B)).apply("myCol")
  val df2_new = df2.withColumn("myCol", cleanCol)

  ...

  sc.stop()
}

In the real world, I find myself writing quite a few functions to modularize the tasks. For example, I would have a few functions just to load the different data tables. And in these load functions I would call other functions to do necessary data cleaning/transformation as I load the data. Then I would pass the contexts like so:

 def loadHdfsFileAndBroadcast(sc: SparkContext) = {
   // use sc here
   val df = sc.textFile("hdfs://.../myfile.csv").map(...)
   val dfB = sc.broadcast(df)
   dfB
 }

 def loadHiveTable(hiveContext: HiveContext, df1B: Broadcast[Map[String, String]]) = {
   import hiveContext.implicits._
   val data = hiveContext.sql("select * from myHiveTable")
   // data cleaning
   val cleanCol = udf(cleanMyCol(df1B)).apply(col("myCol"))
   df_cleaned = data.withColumn("myCol", cleanCol)
   df_cleaned
 }

As you can see, the load function signatures get heavy quite easily.

I've tried to put these context imports outside the main function inside the class. But that causes problems (see this issue), which leaves me no option but to pass them around.

Is this the way to go or is there a better way to do this?

like image 932
breezymri Avatar asked Apr 04 '17 19:04

breezymri


People also ask

How do I run multiple Spark contexts?

If you have a necessity to work with lots of Spark contexts, you can turn on special option [MultipleContexts] (1) , but it is used only for Spark internal tests and is not supposed to be used in user programs. You will get unexpected behavior while running more than one Spark context in a single JVM [SPARK-2243] (2).

Can we create more than one Spark context?

Note: we can have multiple spark contexts by setting spark. driver. allowMultipleContexts to true . But having multiple spark contexts in the same jvm is not encouraged and is not considered as a good practice as it makes it more unstable and crashing of 1 spark context can affect the other.

How do you do an accumulator with Spark?

An accumulator is created from an initial value v by calling SparkContext. accumulator(v). Tasks running on the cluster can then add to it using the add method or the += operator (in Scala and Python). However, they cannot read its value.

What is parallelize method in Spark?

parallelize() method is the SparkContext's parallelize method to create a parallelized collection. This allows Spark to distribute the data across multiple nodes, instead of depending on a single node to process the data: Now that we have created ... Get PySpark Cookbook now with the O'Reilly learning platform.

What is sparkcontext?

What is SparkContext? Explained - Spark by {Examples} What is SparkContext? Explained SparkContext is available since Spark 1.x (JavaSparkContext for Java) and it used to be an entry point to Spark and PySpark before introducing SparkSession in 2.0.

How do I create a context in spark in Scala?

Spark 1.X – Creating SparkContext using Scala Program In Spark 1.x, first, you need to create a SparkConf instance by assigning app name and setting master by using the SparkConf static methods setAppName () and setMaster () respectively and then pass SparkConf object as an argument to SparkContext constructor to create Spark Context.

How to create a sparksession in pyspark?

Below is a PySpark example to create SparkSession. When running it on the cluster you need to use your master name as an argument to master (). usually, it would be either yarn or mesos depends on your cluster setup. A Spark “driver” is an application that creates a SparkContext for executing one or more jobs in the Spark cluster.

Where can I find examples of SPARK programs?

You can see some example Spark programs on the Spark website. In addition, Spark includes several samples in the examples directory ( Scala , Java , Python ). You can run Java and Scala examples by passing the class name to Spark’s bin/run-example script; for instance:


Video Answer


2 Answers

First, let me say I'm glad that someone is exploring writing clean code in Spark. That is something I always find critical, but it always seems like people are so focused on the analytics themselves they lose sight of maintainability.

I do also agree Spark produces interesting challenges in that regard. The best way I've found, and of course you might feel this isn't an improvement, is to use traits with abstract method definitions and mix those into the object that orchestrates everything.

For example:

trait UsingSparkContextTrait {
   def sc: SparkContext

   def loadHdfsFileAndBroadcast = {
      val df = sc.textFile("hdfs://.../myfile.csv").map(...)
      sc.broadcast(df)
 }
}

trait UsingHiveContextTrait {
   def hiveContext: HiveContext
   def df1B: Broadcast[Map[String, String]]
   def loadHiveTable = {
      val data = hiveContext.sql("select * from myHiveTable")
      val cleanCol = udf(cleanMyCol(df1B)).apply(col("myCol"))
      data.withColumn("myCol", cleanCol)
 }
}

And then finally:

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.hive.HiveContext

class ClassDoingWork extends UsingSparkContextTrait with UsingHiveContextTrait {
   val conf = new SparkConf().setAppName("example")
   val sc = new SparkContext(conf) //Satisfies UsingSparkContextTrait
   val sqlContext = new SQLContext(sc)

   val hiveContext = new HiveContext(sc) //Satisfies UsingHiveContextTrait
   val dfb = loadHdfsFileAndBroadcast    //Satisfies UsingHiveContextTrait
   import hiveContext.implicits._
   import hiveContext.sql

   def doAnalytics = {
      val dfCleaned = loadHiveTable
      ...
   }
}

The cool thing about this dependency injection-ish approach is that you will know at compile-time if you are missing any of the components you need for your code to execute.

Finally, on a much simpler note, you can also access the SparkContext from an RDD instance with rdd.context. That could prove useful too.

like image 143
Vidya Avatar answered Oct 23 '22 17:10

Vidya


If all of your methods are defined in a single object/class, you could make the contexts belong to the object/class and always reference the global instance. If you provide it in the constructor you can even safely import only once and have access to the methods everywhere in your class/object.

For instance, if contexts are defined implicitly in calling object

object testObject {
  def main(args: Array[String]): Unit = {
    val sconf = new SparkConf().setMaster("local[2]").setAppName("testObj")
    val rootLogger = Logger.getRootLogger
    rootLogger.setLevel(Level.ERROR)
    implicit val sc = new SparkContext(sconf)
    implicit val sqlContext = new SQLContext(sc)
    new foo().run()
  }
}

you can use them below in the class that actually holds your logic

case class OneVal(value: String)
class foo(implicit val sc: SparkContext, implicit val sqlC: SQLContext){
  import sqlC.implicits._
  def run(): Unit ={
    doStuff().show(1)
    doOtherStuff().show(1)
  }
  def doStuff(): DataFrame ={
    sc.parallelize(List(OneVal("test"))).toDF()
  }
  def doOtherStuff(): DataFrame ={
    sc.parallelize(List(OneVal("differentTest"))).toDF()
  }
}

In this example SQLContext.toDF is the implicit method in this case.

If run, this gives below output as expected

+-----+
|value|
+-----+
| test|
+-----+

+-------------+
|        value|
+-------------+
|differentTest|
+-------------+
like image 42
Davis Broda Avatar answered Oct 23 '22 17:10

Davis Broda