Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Best way to extract and save values with the same keys from multiple RDDs

I've created two RDDs in PySpark with data extracted from HBase. I want to gather items with the same row keys, store the items and then search through values associated with each of the items. Ideally I'd store the results in a pyspark.sql object, since I want to apply Levenshtein distance to their content.

Details:

In the HBase I have location data, where a row key is the geohash of a given area, and in the columns there are multiple venues in the area with more details (json with description and other text data) on the location. I have two HBase tables and the locations can be the same in both of them. I want to search the data in those two RDDs, check for similar geohashes and store the results in a new data structure.

I don't want to reinvent the wheel and I've just started learning Spark, thus I'm wondering: what's the best way to do such task? Is the built-in function rdd.intersection a good solution?

like image 577
Angie Avatar asked Jan 21 '26 03:01

Angie


1 Answers

Edited: Actually thanks to @Aneel's comments I could correct some of my mistakes. Actually there is a join call on RDDs that gives the same (the join is done on the first column of the RDDs, and the values are a tuple of the rest of the columns of both RDDs), as a call with an JOIN with Spark SQL gives out, instead of doing a cogroup as previously pointed to, since as @Aneel pointed out cogroup squash key-value pair under one single key.

Now on a different note, I tried @Aneel's methods, and the gist above, and try to benchmark it a little bit, here are the results, using databricks' community edition (very small cluster, 6GB of memory, 1 core and Spark 2.1), here is the link. (the code is also at the end of the post)

Here are the results:

  • For a 100000 sized list:
    • Spark SQL: 1.32s
    • RDD join: 0.89s
  • For a 250000 sized list:
    • Spark SQL: 2.2s
    • RDD join: 2.0s
  • For a 500000 sized list:
    • Spark SQL: 3.6s
    • RDD join: 4.6s
  • For a 1000000 sized list:
    • Spark SQL: 7.7s
    • RDD join: 10.2s
  • For a 10000000 sized list (here I called timeit to do only 10 tests, or it will be running until Christmas. Of course the precision is thus decrease):
    • Spark SQL: 57.6s
    • RDD join: 89.9s

Actually it looks like that for small datasets RDDs are faster than Dataframes, but once you reach a threshold (around 250k records), Dataframes join start to be faster

Now as @Aneel suggested, bear in mind that I made a pretty simple example, and you might want to do some testing on your own set of data and environment (I did not go farther than 10M lines in my 2 lists because it already took 2.6 min to initialized).

Initialization code:

#Init code
NUM_TESTS=100
from random import randint
l1 = []
l2 = []

import timeit
for i in xrange(0, 10000000):
  t = (randint(0,2000), randint(0,2000))
  v = randint(0,2000)
  l1.append((t,v))
  if (randint(0,100) > 25): #at least 25% of the keys should be similar
    t = (randint(0,2000), randint(0,2000))
  v = randint(0,2000)
  l2.append((t,v))

rdd1 = sc.parallelize(l1)
rdd2 = sc.parallelize(l2)

Spark SQL test:

#Test Spark SQL    
def callable_ssql_timeit():
  df1 = spark.createDataFrame(rdd1).toDF("id", "val")
  df1.createOrReplaceTempView("table1")
  df2 = spark.createDataFrame(rdd2).toDF("id", "val")
  df2.createOrReplaceTempView("table2")
  query="SELECT * FROM table1 JOIN table2 ON table1.id=table2.id"
  spark.sql(query).count()


print(str(timeit.timeit(callable_ssql_timeit, number=NUM_TESTS)/float(NUM_TESTS)) +  "s")

RDD join test:

#Test RDD join
def callable_rdd_timeit():
  rdd1.join(rdd2).count()
print(str(timeit.timeit(callable_rdd_timeit, number=NUM_TESTS)/float(NUM_TESTS)) + "s")
like image 166
Adonis Avatar answered Jan 23 '26 16:01

Adonis



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!