I need some help getting my brain around designing an (efficient) markov chain in spark (via python). I've written it as best as I could, but the code I came up with doesn't scale.. Basically for the various map stages, I wrote custom functions and they work fine for sequences of a couple thousand, but when we get in the 20,000+ (and I've got some up to 800k) things slow to a crawl.
For those of you not familiar with markov moodels, this is the gist of it..
This is my data.. I've got the actual data (no header) in an RDD at this point.
ID, SEQ
500, HNL, LNH, MLH, HML
We look at sequences in tuples, so
(HNL, LNH), (LNH,MLH), etc..
And I need to get to this point.. where I return a dictionary (for each row of data) that I then serialize and store in an in memory database.
{500:
{HNLLNH : 0.333},
{LNHMLH : 0.333},
{MLHHML : 0.333},
{LNHHNL : 0.000},
etc..
}
So in essence, each sequence is combined with the next (HNL,LNH become 'HNLLNH'), then for all possible transitions (combinations of sequences) we count their occurrence and then divide by the total number of transitions (3 in this case) and get their frequency of occurrence.
There were 3 transitions above, and one of those was HNLLNH.. So for HNLLNH, 1/3 = 0.333
As a side not, and I'm not sure if it's relevant, but the values for each position in a sequence are limited.. 1st position (H/M/L), 2nd position (M/L), 3rd position (H,M,L).
What my code had previously done was to collect() the rdd, and map it a couple times using functions I wrote. Those functions first turned the string into a list, then merged list[1] with list[2], then list[2] with list[3], then list[3] with list[4], etc.. so I ended up with something like this..
[HNLLNH],[LNHMLH],[MHLHML], etc..
Then the next function created a dictionary out of that list, using the list item as a key and then counted the total ocurrence of that key in the full list, divided by len(list) to get the frequency. I then wrapped that dictionary in another dictionary, along with it's ID number (resulting in the 2nd code block, up a above).
Like I said, this worked well for small-ish sequences, but not so well for lists with a length of 100k+.
Also, keep in mind, this is just one row of data. I have to perform this operation on anywhere from 10-20k rows of data, with rows of data varying between lengths of 500-800,000 sequences per row.
Any suggestions on how I can write pyspark code (using the API map/reduce/agg/etc.. functions) to do this efficiently?
EDIT Code as follows.. Probably makes sense to start at the bottom. Please keep in mind I'm learning this(Python and Spark) as I go, and I don't do this for a living, so my coding standards are not great..
def f(x):
# Custom RDD map function
# Combines two separate transactions
# into a single transition state
cust_id = x[0]
trans = ','.join(x[1])
y = trans.split(",")
s = ''
for i in range(len(y)-1):
s= s + str(y[i] + str(y[i+1]))+","
return str(cust_id+','+s[:-1])
def g(x):
# Custom RDD map function
# Calculates the transition state probabilities
# by adding up state-transition occurrences
# and dividing by total transitions
cust_id=str(x.split(",")[0])
trans = x.split(",")[1:]
temp_list=[]
middle = int((len(trans[0])+1)/2)
for i in trans:
temp_list.append( (''.join(i)[:middle], ''.join(i)[middle:]) )
state_trans = {}
for i in temp_list:
state_trans[i] = temp_list.count(i)/(len(temp_list))
my_dict = {}
my_dict[cust_id]=state_trans
return my_dict
def gen_tsm_dict_spark(lines):
# Takes RDD/string input with format CUST_ID(or)PROFILE_ID,SEQ,SEQ,SEQ....
# Returns RDD of dict with CUST_ID and tsm per customer
# i.e. {cust_id : { ('NLN', 'LNN') : 0.33, ('HPN', 'NPN') : 0.66}
# creates a tuple ([cust/profile_id], [SEQ,SEQ,SEQ])
cust_trans = lines.map(lambda s: (s.split(",")[0],s.split(",")[1:]))
with_seq = cust_trans.map(f)
full_tsm_dict = with_seq.map(g)
return full_tsm_dict
def main():
result = gen_tsm_spark(my_rdd)
# Insert into DB
for x in result.collect():
for k,v in x.iteritems():
db_insert(k,v)
You can try something like below. It depends heavily on tooolz
but if you prefer to avoid external dependencies you can easily replace it with some standard Python libraries.
from __future__ import division
from collections import Counter
from itertools import product
from toolz.curried import sliding_window, map, pipe, concat
from toolz.dicttoolz import merge
# Generate all possible transitions
defaults = sc.broadcast(dict(map(
lambda x: ("".join(concat(x)), 0.0),
product(product("HNL", "NL", "HNL"), repeat=2))))
rdd = sc.parallelize(["500, HNL, LNH, NLH, HNL", "600, HNN, NNN, NNN, HNN, LNH"])
def process(line):
"""
>>> process("000, HHH, LLL, NNN")
('000', {'LLLNNN': 0.5, 'HHHLLL': 0.5})
"""
bits = line.split(", ")
transactions = bits[1:]
n = len(transactions) - 1
frequencies = pipe(
sliding_window(2, transactions), # Get all transitions
map(lambda p: "".join(p)), # Joins strings
Counter, # Count
lambda cnt: {k: v / n for (k, v) in cnt.items()} # Get frequencies
)
return bits[0], frequencies
def store_partition(iter):
for (k, v) in iter:
db_insert(k, merge([defaults.value, v]))
rdd.map(process).foreachPartition(store_partition)
Since you know all possible transitions I would recommend using a sparse representation and ignore zeros. Moreover you can replace dictionaries with sparse vectors to reduce memory footprint.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With