To compare performance of Spark when using Python and Scala I created the same job in both languages and compared the runtime. I expected both jobs to take roughly the same amount of time, but Python job took only 27min
, while Scala job took 37min
(almost 40% longer!). I implemented the same job in Java as well and it took 37minutes
too. How is this possible that Python is so much faster?
Minimal verifiable example:
Python job:
# Configuration
conf = pyspark.SparkConf()
conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
conf.set("spark.executor.instances", "4")
conf.set("spark.executor.cores", "8")
sc = pyspark.SparkContext(conf=conf)
# 960 Files from a public dataset in 2 batches
input_files = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312025.20/warc/CC-MAIN-20190817203056-20190817225056-00[0-5]*"
input_files2 = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312128.3/warc/CC-MAIN-20190817102624-20190817124624-00[0-3]*"
# Count occurances of a certain string
logData = sc.textFile(input_files)
logData2 = sc.textFile(input_files2)
a = logData.filter(lambda value: value.startswith('WARC-Type: response')).count()
b = logData2.filter(lambda value: value.startswith('WARC-Type: response')).count()
print(a, b)
Scala job:
// Configuration
config.set("spark.executor.instances", "4")
config.set("spark.executor.cores", "8")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
// 960 Files from a public dataset in 2 batches
val input_files = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312025.20/warc/CC-MAIN-20190817203056-20190817225056-00[0-5]*"
val input_files2 = "s3a://commoncrawl/crawl-data/CC-MAIN-2019-35/segments/1566027312128.3/warc/CC-MAIN-20190817102624-20190817124624-00[0-3]*"
// Count occurances of a certain string
val logData1 = sc.textFile(input_files)
val logData2 = sc.textFile(input_files2)
val num1 = logData1.filter(line => line.startsWith("WARC-Type: response")).count()
val num2 = logData2.filter(line => line.startsWith("WARC-Type: response")).count()
println(s"Lines with a: $num1, Lines with b: $num2")
Just by looking at the code, they seem to be identical. I looked a the DAGs and they didn't provide any insights (or at least I lack the know-how to come up with an explanation based on them).
I would really appreciate any pointers.
Scala is faster than Python due to its static type language. If faster performance is a requirement, Scala is a good bet. Spark is native in Scala, hence making writing Spark jobs in Scala the native way.
When it comes to performance, Scala is the clear winner over Python. One reason Scala wins on performance is that it is a statically typed programming language and Python is a dynamically typed programming language. With statically typed languages, the compiler knows each variable or expression at runtime.
Scala, a compiled language, is seen as being approximately 10 times faster than an interpreted Python because the source code is translated to efficient machine representation before the runtime.
Scala and PySpark should perform relatively equally for DataFrame operations. This thread has a dated performance comparison. “Regular” Scala code can run 10-20x faster than “regular” Python code, but that PySpark isn't executed liked like regular Python code, so this performance comparison isn't relevant.
Your basic assumption, that Scala or Java should be faster for this specific task, is just incorrect. You can easily verify it with minimal local applications. Scala one:
import scala.io.Source
import java.time.{Duration, Instant}
object App {
def main(args: Array[String]) {
val Array(filename, string) = args
val start = Instant.now()
Source
.fromFile(filename)
.getLines
.filter(line => line.startsWith(string))
.length
val stop = Instant.now()
val duration = Duration.between(start, stop).toMillis
println(s"${start},${stop},${duration}")
}
}
Python one
import datetime
import sys
if __name__ == "__main__":
_, filename, string = sys.argv
start = datetime.datetime.now()
with open(filename) as fr:
# Not idiomatic or the most efficient but that's what
# PySpark will use
sum(1 for _ in filter(lambda line: line.startswith(string), fr))
end = datetime.datetime.now()
duration = round((end - start).total_seconds() * 1000)
print(f"{start},{end},{duration}")
Results (300 repetitions each, Python 3.7.6, Scala 2.11.12), on Posts.xml
from hermeneutics.stackexchange.com data dump with mix of matching and non matching patterns:
As you see Python is not only systematically faster, but also is more consistent (lower spread).
Take away message is ‒ don't believe unsubstantiated FUD ‒ languages can be faster or slower on specific tasks or with specific environments (for example here Scala can be hit by JVM startup and / or GC and / or JIT), but if you claims like " XYZ is X4 faster" or "XYZ is slow as compared to ZYX (..) Approximately, 10x slower" it usually means that someone wrote really bad code to test things.
Edit:
To address some concerns raised in the comments:
local_connect_and_auth
, and its nothing else than socket associated file). Again, as cheap as it gets when it comes to communication between processes.Edit 2:
Since jasper-m was concerned about startup cost here, one can easily prove that Python has still significant advantage over Scala even if input size is significantly increased.
Here are results for 2003360 lines / 5.6G (the same input, just duplicated multiple times, 30 repetitions), which way exceeds anything you can expect in a single Spark task.
Please note non-overlapping confidence intervals.
Edit 3:
To address another comment from Jasper-M:
The bulk of all the processing is still happening inside a JVM in the Spark case.
That is simply incorrect in this particular case:
DataFrame
) implement gross of functionality natively in Python, with exception input, output and inter-node communication.The Scala job takes longer because it has a misconfiguration and, therefore, the Python and Scala jobs had been provided with unequal resources.
There are two mistakes in the code:
val sc = new SparkContext(config) // LINE #1
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
sc.hadoopConfiguration.set("spark.executor.instances", "4") // LINE #4
sc.hadoopConfiguration.set("spark.executor.cores", "8") // LINE #5
sc.hadoopConfiguration
is a wrong place to set any Spark configuration. It should be set in the config
instance you pass to new SparkContext(config)
.[ADDED] Bearing the above in mind, I would propose to change the code of the Scala job to
config.set("spark.executor.instances", "4")
config.set("spark.executor.cores", "8")
val sc = new SparkContext(config) // LINE #1
sc.setLogLevel("WARN")
sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")
and re-test it again. I bet the Scala version is going to be X times faster now.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With