Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: Why does Python significantly outperform Scala in my use case?

To compare performance of Spark when using Python and Scala I created the same job in both languages and compared the runtime. I expected both jobs to take roughly the same amount of time, but Python job took only 27min, while Scala job took 37min (almost 40% longer!). I implemented the same job in Java as well and it took 37minutes too. How is this possible that Python is so much faster?

Minimal verifiable example:

Python job:

# Configuration
conf = pyspark.SparkConf()
conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
conf.set("spark.executor.instances", "4")
conf.set("spark.executor.cores", "8")
sc = pyspark.SparkContext(conf=conf)

# 960 Files from a public dataset in 2 batches
input_files = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312025.20/warc/CC-MAIN-20190817203056-20190817225056-00[0-5]*"
input_files2 = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312128.3/warc/CC-MAIN-20190817102624-20190817124624-00[0-3]*"

# Count occurances of a certain string
logData = sc.textFile(input_files)
logData2 = sc.textFile(input_files2)
a = logData.filter(lambda value: value.startswith('WARC-Type: response')).count()
b = logData2.filter(lambda value: value.startswith('WARC-Type: response')).count()

print(a, b)

Scala job:

// Configuration
config.set("spark.executor.instances", "4")
config.set("spark.executor.cores", "8")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

// 960 Files from a public dataset in 2 batches 
val input_files = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312025.20/warc/CC-MAIN-20190817203056-20190817225056-00[0-5]*"
val input_files2 = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312128.3/warc/CC-MAIN-20190817102624-20190817124624-00[0-3]*"

// Count occurances of a certain string
val logData1 = sc.textFile(input_files)
val logData2 = sc.textFile(input_files2)
val num1 = logData1.filter(line => line.startsWith("WARC-Type: response")).count()
val num2 = logData2.filter(line => line.startsWith("WARC-Type: response")).count()

println(s"Lines with a: $num1, Lines with b: $num2")

Just by looking at the code, they seem to be identical. I looked a the DAGs and they didn't provide any insights (or at least I lack the know-how to come up with an explanation based on them).

I would really appreciate any pointers.

like image 393
maestromusica Avatar asked Feb 23 '20 15:02

maestromusica


People also ask

Is Python or Scala better for Spark?

Scala is faster than Python due to its static type language. If faster performance is a requirement, Scala is a good bet. Spark is native in Scala, hence making writing Spark jobs in Scala the native way.

Is Scala more powerful than Python?

When it comes to performance, Scala is the clear winner over Python. One reason Scala wins on performance is that it is a statically typed programming language and Python is a dynamically typed programming language. With statically typed languages, the compiler knows each variable or expression at runtime.

Which is faster Scala or Python?

Scala, a compiled language, is seen as being approximately 10 times faster than an interpreted Python because the source code is translated to efficient machine representation before the runtime.

Is PySpark slower than Scala Spark?

Scala and PySpark should perform relatively equally for DataFrame operations. This thread has a dated performance comparison. “Regular” Scala code can run 10-20x faster than “regular” Python code, but that PySpark isn't executed liked like regular Python code, so this performance comparison isn't relevant.


2 Answers

Your basic assumption, that Scala or Java should be faster for this specific task, is just incorrect. You can easily verify it with minimal local applications. Scala one:

import scala.io.Source
import java.time.{Duration, Instant}

object App {
  def main(args: Array[String]) {
    val Array(filename, string) = args

    val start = Instant.now()

    Source
      .fromFile(filename)
      .getLines
      .filter(line => line.startsWith(string))
      .length

    val stop = Instant.now()
    val duration = Duration.between(start, stop).toMillis
    println(s"${start},${stop},${duration}")
  }
}

Python one

import datetime
import sys

if __name__ == "__main__":
    _, filename, string = sys.argv
    start = datetime.datetime.now()
    with open(filename) as fr:
        # Not idiomatic or the most efficient but that's what
        # PySpark will use
        sum(1 for _ in filter(lambda line: line.startswith(string), fr))

    end = datetime.datetime.now()
    duration = round((end - start).total_seconds() * 1000)
    print(f"{start},{end},{duration}")

Results (300 repetitions each, Python 3.7.6, Scala 2.11.12), on Posts.xml from hermeneutics.stackexchange.com data dump with mix of matching and non matching patterns:

boxplots of durartion in millis for above programs

  • Python 273.50 (258.84, 288.16)
  • Scala 634.13 (533.81, 734.45)

As you see Python is not only systematically faster, but also is more consistent (lower spread).

Take away message is ‒ don't believe unsubstantiated FUD ‒ languages can be faster or slower on specific tasks or with specific environments (for example here Scala can be hit by JVM startup and / or GC and / or JIT), but if you claims like " XYZ is X4 faster" or "XYZ is slow as compared to ZYX (..) Approximately, 10x slower" it usually means that someone wrote really bad code to test things.

Edit:

To address some concerns raised in the comments:

  • In the OP code data is passed in mostly in one direction (JVM -> Python) and no real serialization is required (this specific path just passes bytestring as-is and decodes on UTF-8 on the other side). That's as cheap as it gets when it comes to "serialization".
  • What is passed back is just a single integer by partition, so in that direction impact is negligible.
  • Communication is done over local sockets (all communication on worker beyond initial connect and auth is performed using file descriptor returned from local_connect_and_auth, and its nothing else than socket associated file). Again, as cheap as it gets when it comes to communication between processes.
  • Considering difference in raw performance shown above (much higher than what you see in you program), there is a lot of margin for overheads listed above.
  • This case is completely different from cases where either simple or complex objects have to be passed to and from Python interpreter in a form that is accessible to both parties as pickle-compatible dumps (most notable examples include old-style UDF, some parts of old-style MLLib).

Edit 2:

Since jasper-m was concerned about startup cost here, one can easily prove that Python has still significant advantage over Scala even if input size is significantly increased.

Here are results for 2003360 lines / 5.6G (the same input, just duplicated multiple times, 30 repetitions), which way exceeds anything you can expect in a single Spark task.

enter image description here

  • Python 22809.57 (21466.26, 24152.87)
  • Scala 27315.28 (24367.24, 30263.31)

Please note non-overlapping confidence intervals.

Edit 3:

To address another comment from Jasper-M:

The bulk of all the processing is still happening inside a JVM in the Spark case.

That is simply incorrect in this particular case:

  • The job in question is map job with single global reduce using PySpark RDDs.
  • PySpark RDD (unlike let's say DataFrame) implement gross of functionality natively in Python, with exception input, output and inter-node communication.
  • Since it is single stage job, and final output is small enough to be ignored, the main responsibility of JVM (if one was to nitpick, this is implemented mostly in Java not Scala) is to invoke Hadoop input format, and push data through socket file to Python.
  • The read part is identical for JVM and Python API, so it can be considered as constant overhead. It also doesn't qualify as the bulk of the processing, even for such simple job like this one.
like image 140
user10938362 Avatar answered Oct 12 '22 19:10

user10938362


The Scala job takes longer because it has a misconfiguration and, therefore, the Python and Scala jobs had been provided with unequal resources.

There are two mistakes in the code:

val sc = new SparkContext(config) // LINE #1
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
sc.hadoopConfiguration.set("spark.executor.instances", "4") // LINE #4
sc.hadoopConfiguration.set("spark.executor.cores", "8") // LINE #5
  1. LINE 1. Once the line has been executed, the resource configuration of the Spark job is already established and fixed. From this point on, no way to adjust anything. Neither the number of executors nor the number of cores per executor.
  2. LINE 4-5. sc.hadoopConfiguration is a wrong place to set any Spark configuration. It should be set in the config instance you pass to new SparkContext(config).

[ADDED] Bearing the above in mind, I would propose to change the code of the Scala job to

config.set("spark.executor.instances", "4")
config.set("spark.executor.cores", "8")
val sc = new SparkContext(config) // LINE #1
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

and re-test it again. I bet the Scala version is going to be X times faster now.

like image 5
egordoe Avatar answered Oct 12 '22 18:10

egordoe