Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Custom Partitioner in Pyspark 2.1.0

I read that RDDs with the same partitioner will be co-located. This is important to me because I want to join several large Hive tables that are not partitioned. My theory is that if I can get them partitioned (by a field call date_day) and co-located then I would avoid shuffling .

Here is what I am trying to do for each table:

def date_day_partitioner(key):
  return (key.date_day - datetime.date(2017,05,01)).days

df = sqlContext.sql("select * from hive.table")
rdd = df.rdd
rdd2 = rdd.partitionBy(100, date_day_partitioner)
df2 = sqlContext.createDataFrame(rdd2, df_log_entry.schema)

print df2.count()

Unfortunately, I can't even test my theory about co-location and avoiding shuffling, because I get the following error when I try partitionBy: ValueError: too many values to unpack

Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-118755547579363441.py", line 346, in <module>
    raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
  File "/tmp/zeppelin_pyspark-118755547579363441.py", line 339, in <module>
    exec(code)
  File "<stdin>", line 15, in <module>
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 380, in count
    return int(self._jdf.count())
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o115.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 21 in stage 6.0 failed 4 times, most recent failure: Lost task 21.3 in stage 6.0 (TID 182, ip-172-31-49-209.ec2.internal, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/zeppelin/appcache/application_1509802099365_0013/container_1509802099365_0013_01_000007/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/mnt/yarn/usercache/zeppelin/appcache/application_1509802099365_0013/container_1509802099365_0013_01_000007/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/mnt/yarn/usercache/zeppelin/appcache/application_1509802099365_0013/container_1509802099365_0013_01_000007/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream
    for obj in iterator:
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1752, in add_shuffle_key
ValueError: too many values to unpack
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
...

I must be doing something wrong, could you please help?

like image 657
user2360416 Avatar asked Oct 17 '22 03:10

user2360416


1 Answers

It's happening because you are not applying partitionBy on key-value pair rdd. Your rdd must be in key-value pair. Also, your key type should be integer. I don't have sample data for your hive table. So let's demonstrate the fact using below hive table:

I have created a below dataframe using hive table :

df = spark.table("udb.emp_details_table");
+------+--------+--------+----------------+
|emp_id|emp_name|emp_dept|emp_joining_date|
+------+--------+--------+----------------+
|     1|     AAA|      HR|      2018-12-06|
|     1|     BBB|      HR|      2017-10-26|
|     2|     XXX|   ADMIN|      2018-10-22|
|     2|     YYY|   ADMIN|      2015-10-19|
|     2|     ZZZ|      IT|      2018-05-14|
|     3|     GGG|      HR|      2018-06-30|
+------+--------+--------+----------------+

Now, I wish to partition my dataframe and want to keep the similar keys in one partition. So, I have converted my dataframe to rdd as you can only apply partitionBy on rdd for re-partitioning.

    myrdd = df.rdd
    newrdd = myrdd.partitionBy(10,lambda k: int(k[0]))
    newrdd.take(10)

I got the same error:

 File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 1767, in add_shuffle_key
    for k, v in iterator:
ValueError: too many values to unpack 

Hence, we need to convert our rdd into key-value pair to use paritionBy

keypair_rdd = myrdd.map(lambda x : (x[0],x[1:]))

Now,you can see that rdd has been converted to key value pair and you can therefore distribute your data in partitions according to keys available.

[(u'1', (u'AAA', u'HR', datetime.date(2018, 12, 6))), 
(u'1', (u'BBB', u'HR', datetime.date(2017, 10, 26))), 
(u'2', (u'XXX', u'ADMIN', datetime.date(2018, 10, 22))), 
(u'2', (u'YYY', u'ADMIN', datetime.date(2015, 10, 19))), 
(u'2', (u'ZZZ', u'IT', datetime.date(2018, 5, 14))), 
(u'3', (u'GGG', u'HR', datetime.date(2018, 6, 30)))]

Using a paritionBy on key-value rdd now:

newrdd = keypair_rdd.partitionBy(5,lambda k: int(k[0]))

Lets take a look at the partitions. Data is grouped and similar keys are stored into similar partitions now. Two of them are empty.

>>> print("Partitions structure: {}".format(newrdd.glom().map(len).collect()))
Partitions structure: [0, 2, 3, 1, 0]

Now lets say I want to custom partitioning my data. So I have created below function to keep keys '1' and '3' in similar partition.

def partitionFunc(key):
    import random
    if key == 1 or key == 3:
        return 0
    else:
        return random.randint(1,2)

newrdd = keypair_rdd.partitionBy(5,lambda k: partitionFunc(int(k[0])))

>>> print("Partitions structure: {}".format(newrdd.glom().map(len).collect()))
Partitions structure: [3, 3, 0, 0, 0]

As you can see now that keys 1 and 3 are stored in one partition and rest on other.

I hope this helps. You can try to partitionBy your dataframe. Make sure to convert it into key value pair and keeping key as type integer.

like image 87
vikrant rana Avatar answered Oct 19 '22 22:10

vikrant rana