Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark sql Dataframe - import sqlContext.implicits._

I have main that creates spark context:

    val sc = new SparkContext(sparkConf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

Then creates dataframe and does filters and validations on the dataframe.

    val convertToHourly = udf((time: String) => time.substring(0, time.indexOf(':')) + ":00:00")

    val df = sqlContext.read.schema(struct).format("com.databricks.spark.csv").load(args(0))
    // record length cannot be < 2 
    .na.drop(3)
    // round to hours
    .withColumn("time",convertToHourly($"time"))

This works great.

BUT When I try moving my validations to another file by sending the dataframe to

function ValidateAndTransform(df: DataFrame) : DataFrame = {...}

that gets the Dataframe & does the validations and transformations: It seems like I need the

 import sqlContext.implicits._

To avoid the error: “value $ is not a member of StringContext” that happens on line: .withColumn("time",convertToHourly($"time"))

But to use the import sqlContext.implicits._ I also need the sqlContext either defined in the new file like so:

val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

or send it to the

function ValidateAndTransform(df: DataFrame) : DataFrame = {...}
function

I feel like the separation I'm trying to do to 2 files (main & validation) is not done correctly...

Any idea on how to design this? Or simply send the sqlContext to the function?

Thanks!

like image 271
Etti Gur Avatar asked Sep 08 '15 09:09

Etti Gur


People also ask

What is the use of import Spark Implicits _?

implicits object gives implicit conversions for converting Scala objects (incl. RDDs) into a Dataset , DataFrame , Columns or supporting such conversions (through Encoders). Creates a DatasetHolder with the input Seq[T] converted to a Dataset[T] (using SparkSession.

What does SQLContext SQL return?

The sql function on a SQLContext enables applications to run SQL queries programmatically and returns the result as a DataFrame .


1 Answers

You can work with a singleton instance of the SQLContext. You can take a look at this example in the spark repository

/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {

  @transient  private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}
...
//And wherever you want you can do
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._  
like image 72
Marco Avatar answered Dec 28 '22 11:12

Marco