I am working on a Scala (2.11) / Spark (1.6.1) streaming project and using mapWithState()
to keep track of seen data from previous batches.
The state is split in 20 partitions, created with StateSpec.function(trackStateFunc _).numPartitions(20)
. I had hoped to distribute the state throughout the cluster, but it seems that each node holds the complete state and the execution is always performed only exactly one node.
Locality Level Summary: Node local: 50
is shown in the UI for each batch and the complete batch is Shuffle read. Afterwards, I write to Kafka and the partitions are spread across the cluster again. I can't seem to find out why mapWithState()
needs to be run on a single node. Doesn't this ruin the concept of partitioning the state if it is limited by one node instead of the complete cluster? Couldn't it be possible to distribute the state by key?
Spark RDD triggers shuffle for several operations like repartition() , groupByKey() , reduceByKey() , cogroup() and join() but not countByKey() .
mapWithState function takes 3 parameters: key (any type), new value (wrapped as Option) and state (in State object). Each of them is important from the point of view of state lifecycle. If new value is defined as None, it means that the state was expired (if timeout is specified).
It happens when one value dominates the partitioning key (for example, the null). All rows with the same partitioning key value must be processed by the same worker node (in the case of partitioning). So if we have 70% of null values in the partitioning key, one node will get at least 70% of the rows.
By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.
I can't seem to find out why
mapWithState
needs to be run on a single node
It doesn't. Spark by default uses a HashPartitioner
to partition your keys among the different worker nodes in your cluster. If for some reason you're seeing all your data stored on a different node, check the distribution of your keys. If this is a custom object you're using as a key, make sure it's hashCode
method is implemented properly. This can happen if something is wrong with the key distribution. If you'd like to test this, try using random numbers as your keys and looking a the Spark UI and seeing if this behavior changes.
I'm running mapWithState
and the data coming in is partitioned based on the key, as I also have a reduceByKey
method call prior to holding the state, and when looking at the Storage
tab on the Spark UI, I can see the different RDD's being stored on different worker nodes in the cluster.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With