Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to call separate logic for diff file name in spark

I have 3 log files in my folders. Like

foldera = emplog,deptlog,companylog
folderb = emplog,deptlog,companylog
folderc = emplog,deptlog,companylog

I have 3 diff scala program file to extract the data from each of them.

employee.scala
department.scala
companylog.scala

Each of them code like below.

I want to combine all these files and execute them paralleled manner.

   package com.sample
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
import org.apache.spark.sql.SQLContext
import org.apache.log4j.{Level, Logger}

object logparser {
  def main(args: Array[String]) = {

      Logger.getLogger("org").setLevel(Level.OFF)     
      Logger.getLogger("akka").setLevel(Level.OFF)
    //Start the Spark context
    val conf = new SparkConf()
      .setAppName("Parser")
      .setMaster("local")

      val sc = new SparkContext(conf)
      val sqlContext= new SQLContext(sc)

      val test = sc.wholeTextFiles("C:\\mkdir\\*\\*")
      .map{l =>
             if(l._1.endsWith("emplog.txt")){ 
             empparser(l._2,sc,sqlContext)
               }

             l
        }
      .foreach{println}
  }

  def empparser(record:String,sc:SparkContext,sqlContext:SQLContext) = {
     val emppattern="""[(](\d+)[)]\s([\w\s._]{30})\s+""".r

      import sqlContext.implicits._
     val indrecs = emppattern.findAllIn(record)
    .map{ line => 
      val emppattern(eid,ename) = line

     (eid,ename)
    }
     .toSeq
   .toDF("eid","ename")

   .show() 


  }
}

I have tried my code in attaching each method within same object.

Now 2 questions arise Q1. When I compile I get

Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@6b0615ae)
    - field (class: com.sample.logparser$$anonfun$1, name: sc$1, type: class org.apache.spark.SparkContext)
    - object (class com.sample.logparser$$anonfun$1, <function1>)

As far as I know(newbie only) Spark context can't be serialized. If I dont pass sc as parameter, I get Nullpointer Exception. How do I solve this?

Q2 :I will insert to hive table code within empparser method after converting to DF. Once that done , I dont want to do anything within my main. But my map code wont execute unless I have action after that. thats why I have foreacch println after that. Is there way to overcome this issue?

like image 970
user7264473 Avatar asked Nov 09 '22 04:11

user7264473


1 Answers

To attempt to answer the question, I'm going to assume that the result of processing a employee or a department results in the same kind of record. I would expect this to be different for each kind of data so I'm keeping the processing of different kinds of records separately to allow for this "adjustment with reality".

First, we define a record case class and parsers for the different kind or record types. (Here I'm copying the same impl for the sake of simplicity)

case class Record(id:String, name: String)

val empParser: String =>  Option[Record] = { record => 
  val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r
  record match {
    case pattern(eid,ename) => Some(Record(eid, ename))
    case _ => None
  }
}

val deptParser: String =>  Option[Record] = { record => 
  val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r
  record match {
    case pattern(eid,ename) => Some(Record(eid, ename))
    case _ => None
  }
}

val companyParser: String =>  Option[Record] = { record => 
  val pattern="""^[(](\d+)[)]\s([\w\s._]{30})\s+$""".r
  record match {
    case pattern(eid,ename) => Some(Record(eid, ename))
    case _ => None
  }
}

We load the data using wholeFiles:

val dataPath = "/.../data/wholefiles/*/*"
val logFiles =  sc.wholeTextFiles(dataPath)

And then, we process the different kind of records by filtering the files to obtain the kind of files that we require and apply the parser we defined above. Note how we are practically repeating the same process. This could be abstracted out.

val empLogs = logFiles.filter{case (filename, content) => filename.endsWith("emplog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> empParser(line))}
val deptLogs = logFiles.filter{case (filename, content) => filename.endsWith("deptlog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> deptParser(line))}
val compLogs = logFiles.filter{case (filename, content) => filename.endsWith("companylog.txt")}.flatMap{case (_,content) => content.split("\n").flatMap(line=> companyParser(line))}

We now convert to a DataFrame

val empDF  = empLogs.toDF

And we could do the same for the other record types as well.

There's plenty of room to reduce code duplication in this process depending on whether we can find commonalities in the processes of the different data types.

like image 86
maasg Avatar answered Nov 14 '22 23:11

maasg