I have a ~40Gb (~80m records, 2 columns only, text) data and ran a count distinct on the data. I could successfully run it on an r5a.4xlarge instance on AWS. Takes approx. 3 mins to return the results. However when I change the instance to a larger one, r5a.12xlarge, I get "Too Many Open Files" error when I run the same code. I tried several different configurations for the spark session, none worked. Additionally, I increased the LINUX limit of open files to 4096, no changes. Below is the code and the first part of the error.
spark = (SparkSession
.builder
.appName('Project_name')
.config('spark.executor.memory', "42G") #Tried 19G to 60G
.config('spark.executor.instances', "4") #Tried 1 to 5
.config('spark.executor.cores', "4") #Tried 1 to 5
.config("spark.dynamicAllocation.enabled", "true") #Also tried without dynamic allocation
.config("spark.dynamicAllocation.minExecutors","1")
.config("spark.dynamicAllocation.maxExecutors","5")
.config('spark.driver.memory', "42G") #Tried 19G to 60G
.config('spark.driver.maxResultSize', '10G') #Tried 1G to 10G
.config('spark.worker.cleanup.enabled', 'True')
.config("spark.local.dir", "/tmp/spark-temp")
.getOrCreate())
Error:
>>> data.select(f.countDistinct("column_name")).show()
Py4JJavaError: An error occurred while calling o315.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 5.0 failed 1 times, most recent failure: Lost task 20.0 in stage 5.0 (TID 64, localhost, executor driver): java.io.FileNotFoundException: /tmp/spark-temp/blockmgr-c2f18891-a868-42ba-9075-dc145faaa4c4/16/temp_shuffle_f9c96d48-336d-423a-9edd-dcb9af5705a7 (Too many open files)
Any thoughts?
Since it is a huge file, when spark reads the file it creates 292 (292*128MB ~ 40G) partitions for the file. By default, spark has spark.sql.shuffle.partitions=200. So, you just need to increase this number to a number higher than the number of partitions. Additionally, you can cache the file in the memory for better performance.
spark = (SparkSession
.builder
.appName('Project_name')
.config('spark.executor.memory', "20G")
.config('spark.driver.memory', "20G")
.config('spark.driver.maxResultSize', '10G')
.config('spark.sql.shuffle.partitions',300) # Increasing SQL shuffle partitions
.config('spark.worker.cleanup.enabled', 'True')
.config("spark.local.dir", "/tmp/spark-temp")
.getOrCreate())
>>> data.select(f.countDistinct("column_name")).show() # Result in ~2min
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With