I am trying to solve the following problem using pyspark. I have a file on hdfs in the format which is a dump of lookup table.
key1, value1
key2, value2
...
I want to load this into python dictionary in pyspark and use it for some other purpose. So I tried to do:
table = {}
def populateDict(line):
(k,v) = line.split(",", 1)
table[k] = v
kvfile = sc.textFile("pathtofile")
kvfile.foreach(populateDict)
I found that table variable is not modified. So, is there a way to create a large inmemory hashtable in spark?
foreach
is a distributed computation so you can't expect it to modify a datasctructure only visible in the driver. What you want is.
kv.map(line => { line.split(" ") match {
case Array(k,v) => (k,v)
case _ => ("","")
}.collectAsMap()
This is in scala but you get the idea, the important function is collectAsMap()
which returns a map to the driver.
If you're data is very large you can use a PairRDD as a map. First map to pairs
kv.map(line => { line.split(" ") match {
case Array(k,v) => (k,v)
case _ => ("","")
}
then you can access with rdd.lookup("key")
which returns a sequence of values associated with the key, though this definitely will not be as efficient as other distributed KV stores, as spark isn't really built for that.
For efficiency, see: sortByKey() and lookup()
lookup(key):
Return the list of values in the RDD for key key. This operation is done efficiently if the RDD has a known partitioner by only searching the partition that the key maps to.
The RDD will be re-partitioned by sortByKey() (see: OrderedRDD) and efficiently searched during lookup()
calls. In code, something like,
kvfile = sc.textFile("pathtofile")
sorted_kv = kvfile.flatMap(lambda x: x.split("," , 1)).sortByKey()
sorted_kv.lookup('key1').take(10)
will do the trick both as an RDD and efficiently.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With