Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

having Spark process partitions concurrently, using a single dev/test machine

I'm naively testing for concurrency in local mode, with the following spark context

SparkSession
      .builder
      .appName("local-mode-spark")
      .master("local[*]")
      .config("spark.executor.instances", 4)
      .config("spark.executor.cores", 2)
      .config("spark.network.timeout", "10000001") // to avoid shutdown during debug, avoid otherwise
      .config("spark.executor.heartbeatInterval", "10000000") // to avoid shutdown during debug, avoid otherwise
      .getOrCreate()

and a mapPartitions API call like follows:

import spark.implicits._ 

val inputDF : DataFrame = spark.read.parquet(inputFile)

val resultDF : DataFrame =
    inputDF.as[T].mapPartitions(sparkIterator => new MyIterator)).toDF

On the surface of it, this did surface one concurrency bug in my code contained in MyIterator (not a bug in Spark's code). However, I'd like to see that my application will crunch all available machine resources both in production, and also during this testing so that the chances of spotting additional concurrency bugs will improve.

That is clearly not the case for me so far: my machine is only at very low CPU utilization throughout the heavy processing of the inputDF, while there's plenty of free RAM and the JVM Xmx poses no real limitation.

How would you recommend testing for concurrency using your local machine? the objective being to test that in production, Spark will not bump into thread-safety or other concurrency issues in my code applied by spark from within MyIterator?

Or can it even in spark local mode, process separate partitions of my input dataframe in parallel? Can I get spark to work concurrently on the same dataframe on a single machine, preferably in local mode?

like image 544
matanster Avatar asked Dec 14 '22 10:12

matanster


2 Answers

  1. Max parallelism

You are already running spark in local mode using .master("local[*]").

local[*] uses as many threads as the number of processors available to the Java virtual machine (it uses Runtime.getRuntime.availableProcessors() to know the number).

  1. Max memory available to all executors/threads

I see that you are not setting the driver memory explicitly. By default the driver memory is 512M. If your local machine can spare more than this, set this explicitly. You can do that by either:

  1. setting it in the properties file (default is spark-defaults.conf),

    spark.driver.memory              5g
    
  2. or by supplying configuration setting at runtime

    $ ./bin/spark-shell --driver-memory 5g
    

Note that this cannot be achieved by setting it in the application, because it is already too late by then, the process has already started with some amount of memory.

  1. Nature of Job

Check number of partitions in your dataframe. That will essentially determine how much max parallelism you can use.

inputDF.rdd.partitions.size 

If the output of this is 1, that means your dataframe has only 1 partition and so you won't get concurrency when you do operations on this dataframe. In that case, you might have to tweak some config to create more number of partitions so that you can concurrently run tasks.

like image 122
moriarty007 Avatar answered May 23 '23 06:05

moriarty007


Running local mode cannot simulate a production environment for the following reasons.

  1. There are lots of code which gets bypassed when code is run in local mode, which would normally run with any other cluster manager. Amongst various issues, few things that i could think
    a. Inability to detect bugs from the way shuffle get handled.(Shuffle data is handled in a completely different way in local mode.)
    b. We will not be able to detect serialization related issues, since all code is available to the driver and task runs in the driver itself, and hence we would not result in any serialization issues.
    c. No speculative tasks(especially for write operations)
    d. Networking related issues, all tasks are executed in same JVM. One would not be able detect issues like communication between driver/executor, codegen related issues.
  2. Concurrency in local mode
    a. Max concurrency than can be attained will be equal to the number of cores in your local machine.(Link to code)
    b. The Job, Stage, Task metrics shown in Spark UI are not accurate since it will incur the overhead of running in the JVM where the driver is also running.
    c: As for CPU/Memoryutilization, it depends on operation being performed. Is the operation CPU/memory intensive?
  3. When to use local mode
    a. Testing of code that will run only on driver
    b. Basic sanity testing of the code that will get executed on the executors
    c. Unit testing

tl; dr The concurrency bugs that occur in local mode might not even be present in other cluster resource managers, since there are lot of special handling in Spark code for local mode(There are lots of code which checks isLocal in code and control goes to a different code flow altogether)

like image 38
DaRkMaN Avatar answered May 23 '23 06:05

DaRkMaN