Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark sbt error: value toDF is not a member of Seq[DataRow]

I have some spark scala code that works without an issue in spark-shell.

The core of this issue lies on these few lines. I want to add an additional line to a dataframe:

object SparkPipeline {

  def main(args: Array[String]) {
    val spark = (SparkSession
        .builder()
        .appName("SparkPipeline")
        .getOrCreate()
        )

    df = (spark
        .read
        .format("com.databricks.spark.avro")
        .load(DATA_PATH)
        )

    case class DataRow(field1: String, field2: String)
    val row_df = Seq(DataRow("FOO", "BAR")).toDF()  // THIS FAILS
    val df_augmented = df.union(row_df)
    //
    // Additional code here
    //
  }
}

However, when I use sbt to package it as a jar, sbt fails with the following error :

value toDF is not a member of Seq[DataRow]

I tried as per this question to do:

val spark = (SparkSession
    .builder()
    .appName("TrainSimpleRF")
    .getOrCreate()
    )

val sc = spark.sparkContext
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

Which does not fix the issue. Also tried importing spark.implicits_ to no avail. I have all the code in the main method. I think that might matters( as per some answers in cloudera forums) but haven't managed to make it work regardless of how I structure the code.

How do I make it work? Any help would be more than welcome.

like image 876
Manuel G Avatar asked May 23 '17 20:05

Manuel G


1 Answers

Ok, I found the solution, as stated in this post, I just had to move the class definition outside the main function like:

// Define the class DataRow prior to the object with the main method
case class DataRow(field1: String, field2: String)

object SparkPipeline {

  def main(args: Array[String]) {
    val spark = (SparkSession
        .builder()
        .appName("SparkPipeline")
        .getOrCreate()
        )

    df = (spark
        .read
        .format("com.databricks.spark.avro")
        .load(DATA_PATH)
        )


    val row_df = Seq(DataRow("FOO", "BAR")).toDF()  // THIS FAILS
    val df_augmented = df.union(row_df)
    //
    // Additional code here
    //
  }
}

It took me a while to process that answer, but its there.

like image 184
Manuel G Avatar answered Sep 27 '22 17:09

Manuel G