Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark code organization and best practices [closed]

So, having spend many years in an object oriented world with code reuse, design patterns and best practices always taken into account, I find myself struggling somewhat with code organization and code reuse in world of Spark.

If I try to write code in a reusable way, it nearly always comes with a performance cost and I end up rewriting it to whatever is optimal for my particular use case. This constant "write what is optimal for this particular use case" also affects code organization, because splitting code into different objects or modules is difficult when "it all really belongs together" and I thus end up with very few "God" object containing long chains of complex transformations. In fact, I frequently think that if I had taken a look at most of the Spark code I'm writing now back when I was working in the object oriented world, I would have winced and dismissed it as "spaghetti code".

I have surfed the internet trying to find some sort of equivalent to the best practices of the object oriented world, but without much luck. I can find some "best practices" for functional programming but Spark just adds an extra layer, because performance is such a major factor here.

So my question to you is, have any of you Spark gurus found some best practices for writing Spark code that you can recommend?

EDIT

As written in a comment, I did not actually expect anyone to post an answer on how to solve this problem, but rather I was hoping that someone in this community had come across some Martin Fowler type, who had written som articles or blog posts somewhere on how to address problems with code organization in the world of Spark.

@DanielDarabos suggested that I might put in an example of a situation where code organization and performance are conflicting. While I find that I frequently have issues with this in my everyday work, I find it a bit hard to boil it down to a good minimal example ;) but I will try.

In the object oriented world, I'm a big fan of the Single Responsibility Principle, so I would make sure that my methods were only responsible for one thing. It makes them reusable and easily testable. So if I had to, say, calculate the sum of some numbers in a list (matching some criteria) and I had to calculate the average of the same number, I would most definitely create two methods - one that calculated the sum and one that calculated the average. Like this:

def main(implicit args: Array[String]): Unit = {   val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5))    println("Summed weights for DK = " + summedWeights(list, "DK")   println("Averaged weights for DK = " + averagedWeights(list, "DK") }  def summedWeights(list: List, country: String): Double = {   list.filter(_._1 == country).map(_._2).sum }  def averagedWeights(list: List, country: String): Double = {   val filteredByCountry = list.filter(_._1 == country)    filteredByCountry.map(_._2).sum/ filteredByCountry.length } 

I can of course continue to honor SRP in Spark:

def main(implicit args: Array[String]): Unit = {   val df = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)).toDF("country", "weight")    println("Summed weights for DK = " + summedWeights(df, "DK")   println("Averaged weights for DK = " + averagedWeights(df, "DK") }   def avgWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {   import org.apache.spark.sql.functions._   import sqlContext.implicits._    val countrySpecific = df.filter('country === country)   val summedWeight = countrySpecific.agg(avg('weight))    summedWeight.first().getDouble(0) }  def summedWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {   import org.apache.spark.sql.functions._   import sqlContext.implicits._    val countrySpecific = df.filter('country === country)   val summedWeight = countrySpecific.agg(sum('weight))    summedWeight.first().getDouble(0) } 

But because my df may contain billions of rows I would rather not have to perform the filter twice. In fact, performance is directly coupled to EMR cost, so I REALLY don't want that. To overcome it, I thus decide to violate SRP and simply put the two functions in one and make sure I call persist on the country-filtered DataFrame, like this:

def summedAndAveragedWeights(df: DataFrame, country: String, sqlContext: SQLContext): (Double, Double) = {   import org.apache.spark.sql.functions._   import sqlContext.implicits._    val countrySpecific = df.filter('country === country).persist(StorageLevel.MEMORY_AND_DISK_SER)   val summedWeights = countrySpecific.agg(sum('weight)).first().getDouble(0)   val averagedWeights = summedWeights / countrySpecific.count()    (summedWeights, averagedWeights) } 

Now, this example if of course a huge simplification of what's encountered in real life. Here I could simply solve it by filtering and persisting df before handing it to the sum and avg functions (which would also be more SRP), but in real life there may be a number of intermediate calculations going on that are needed again and again. In other words, the filter function here is merely an attempt to make a simple example of something that will benefit from being persisted. In fact, I think calls to persist is a keyword here. Calling persist will vastly speed up my job, but the cost is that I have to tightly couple all code that depends on the persisted DataFrame - even if they are logically separate.

like image 1000
Glennie Helles Sindholt Avatar asked Sep 25 '15 07:09

Glennie Helles Sindholt


People also ask

Why are Udfs slow in PySpark?

Since 30th October, 2017, Spark just introduced vectorized udfs for pyspark. The reason that Python UDF is slow, is probably the PySpark UDF is not implemented in a most optimized way: According to the paragraph from the link. Spark added a Python API in version 0.7, with support for user-defined functions.

Why we use parallelize in Spark?

parallelize() method is the SparkContext's parallelize method to create a parallelized collection. This allows Spark to distribute the data across multiple nodes, instead of depending on a single node to process the data: Now that we have created ... Get PySpark Cookbook now with the O'Reilly learning platform.


1 Answers

I think you can subscribe Apache Spark, databricks channel on youtube, listen more and know more, especially for the experiences and lessons from others.

  • Apache Spark
  • databricks
  • Spark Technology Center

here is some videos recommended:

  • SparkUI Visualization
  • slide SparkUI Visualization

  • Spark in Production: Lessons from 100+ Production Users

  • slide Spark in Production: Lessons from 100+ Production Users

  • Spark Tuning for Enterprise System Administrators

  • Spark Tuning for Enterprise System Administrators

  • Building, Debugging, and Tuning Spark Machine Learning Pipelines - Joseph Bradley (Databricks)

  • slide Building, Debugging, and Tuning Spark Machine Learning Pipelines

  • Top 5 Mistakes When Writing Spark Applications

  • slide Top 5 mistakes when writing Spark applications

  • Tuning and Debugging Apache Spark

  • slide Tuning and Debugging Apache Spark

  • A Deeper Understanding of Spark Internals - Aaron Davidson (Databricks)

  • slide A Deeper Understanding of Spark Internals - Aaron Davidson (Databricks)

and I've posted and still updating it on my github and blog:

  • github post
  • blog post

hope this can help you ~

like image 154
taotao.li Avatar answered Sep 29 '22 06:09

taotao.li