I can't find much documentation on ensuring partitioning order - i just want to ensure that given a set of deterministic transformations (output rows always the same), partitions always receive the same set of elements if the underlying dataset doesn't change. Is that possible?
It doesn't need to be sorted: an example would be after a set of transformations are applied on an RDD, it looks like this now -> (A, B, C, D, E, F, G)
And if my spark.default.parallelism were 2 or 3, the set of elements would always be either: (A, B, C, D), (E, F, G) or (A, B), (C, D), (E, F, G) respectively.
This is because i have to cause my executors will be causing some side effects based on the partition/set of elements it is operating on, and i want to make sure that the Spark application is idempotent. (same side effect if it restarts)
Edit: Apparently, DF repartition is deterministic but RDD partition is not (Spark 2.4.4).
def f1(rdds):
rows = list(rdds)
stats_summary = [{
'origin': str(row['origin']),
'dest': str(row['dest']),
'start_time': analysis_date.isoformat(),
'value': row['count']
} for row in rows]
stats_summary.sort(key=lambda t: (t['start_time'], t['origin'], t['dest']))
rtn = "partition size: {}, first: ({}, {}), last: ({}, {})".format(
len(rows),
stats_summary[0]["origin"], stats_summary[0]["dest"],
stats_summary[-1]["origin"], stats_summary[-1]["dest"])
return [rtn]
repartition_rdd_res = unq_statistics.rdd \
.repartition(10) \
.mapPartitions(f1) \
.collect()
repartition_df_res = unq_statistics.repartition(10) \
.rdd \
.mapPartitions(f1) \
.collect()
repartition_rdd_res4 = ['partition size: 131200, first: (-1, -1), last: (999, -1)',
'partition size: 131209, first: (-1, 1014), last: (996, 996)',
'partition size: 131216, first: (-1, 1021), last: (999, 667)',
'partition size: 131218, first: (-1, 1008), last: (991, 1240)',
'partition size: 131222, first: (-1, 1001), last: (994, 992)',
'partition size: 131229, first: (-1, 1007), last: (994, 890)',
'partition size: 131233, first: (-1, 1004), last: (991, -1)',
'partition size: 131235, first: (-1, 1005), last: (999, 1197)',
'partition size: 131237, first: (-1, 100), last: (999, 997)',
'partition size: 131240, first: (-1, 1010), last: (994, -1)']
repartition_rdd_res3 = ['partition size: 131200, first: (-1, -1), last: (999, -1)',
'partition size: 131209, first: (-1, 1006), last: (994, 2048)',
'partition size: 131216, first: (-1, 1002), last: (996, 996)',
'partition size: 131218, first: (-1, 1017), last: (999, 667)',
'partition size: 131222, first: (-1, 1008), last: (994, 890)',
'partition size: 131229, first: (-1, 1000), last: (99, 96)',
'partition size: 131233, first: (-1, 1001), last: (994, 992)',
'partition size: 131235, first: (-1, 1009), last: (990, 1601)',
'partition size: 131237, first: (-1, 1004), last: (994, -1)',
'partition size: 131240, first: (-1, 1003), last: (999, 997)']
repartition_rdd_res2 = ['partition size: 131200, first: (-1, 1013), last: (991, 2248)',
'partition size: 131209, first: (-1, 1007), last: (999, 667)',
'partition size: 131216, first: (-1, 100), last: (99, 963)',
'partition size: 131218, first: (-1, 1002), last: (999, 997)',
'partition size: 131222, first: (-1, 101), last: (996, 996)',
'partition size: 131229, first: (-1, -1), last: (991, 1240)',
'partition size: 131233, first: (-1, 1006), last: (999, 1197)',
'partition size: 131235, first: (-1, 1001), last: (994, 992)',
'partition size: 131237, first: (-1, 1019), last: (999, -1)',
'partition size: 131240, first: (-1, 1017), last: (991, -1)']
repartition_df_res2 = ['partition size: 131222, first: (-1, 1023), last: (996, 996)',
'partition size: 131223, first: (-1, 1003), last: (999, 667)',
'partition size: 131223, first: (-1, 1012), last: (990, 990)',
'partition size: 131224, first: (-1, -1), last: (999, 1558)',
'partition size: 131224, first: (-1, 100), last: (99, 98)',
'partition size: 131224, first: (-1, 1008), last: (99, 968)',
'partition size: 131224, first: (-1, 1018), last: (999, 997)',
'partition size: 131225, first: (-1, 1006), last: (994, 992)',
'partition size: 131225, first: (-1, 101), last: (990, 935)',
'partition size: 131225, first: (-1, 1013), last: (999, 1197)']
Apache Spark's Resilient Distributed Datasets (RDD) are a collection of various data that are so big in size, that they cannot fit into a single node and should be partitioned across various nodes. Apache Spark automatically partitions RDDs and distributes the partitions across different nodes.
Spark partitions number Best way to decide a number of spark partitions in an RDD is to make the number of partitions equal to the number of cores over the cluster. This results in all the partitions will process in parallel. Also, use of resources will do in an optimal way.
As already mentioned above, one partition is created for each block of the file in HDFS which is of size 64MB. However, when creating a RDD a second argument can be passed that defines the number of partitions to be created for an RDD. The above line of code will create an RDD named textFile with 5 partitions.
Lets look at the source, and specifically its shuffle part:
...
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
...
As you can see the distribution of elements from a given source partition N
into X
target partitions is a simple increment (later modulo'ed by X
) starting from some number which depends only on that N
, and hence pre-determined. So if your source RDD is unchanged, the result of repartition(X)
should be the same every time as well.
Internally, Spark uses a default partitioner(HashPartitioner depending on the data) to partition the data, which uses hash to identify which partition that item belongs to. Thus, you can say that the data item will always go to the same partition given that the partition count is same, because if partition count is change, it will effect the hash as well.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With