I am working for the first time with dask and trying to run predict() from a trained keras model.
If I dont use dask, the function works fine (i.e. pd.DataFrame() versus dd.DataFrame () ). With Dask the error is below. Is this not a common use case (aside from scoring a groupby perhaps)
def calc_HR_ind_dsk(grp):
model=keras.models.load_model('/home/embedding_model.h5')
topk=10
x=[grp['user'].values,grp['item'].values]
pred_act=list(zip(model.predict(x)[:,0],grp['respond'].values))
top=sorted(pred_act, key=lambda x: -x[0])[0:topk]
hit=sum([x[1] for x in top])
return(hit)
import dask.dataframe as dd
#step 1 - read in data as a dask df. We could reference more than 1 files using '*' wildcard
df = dd.read_csv('/home/test_coded_final.csv',dtype='int64')
results=df.groupby('user').apply(calc_HR_ind_dsk).compute()
TypeError: Cannot interpret feed_dict key as Tensor: Tensor Tensor("Placeholder_30:0", shape=(55188, 32), dtype=float32) is not an element of this graph.
We can also use open source scalers like Dask to parallelize this (read -> preprocess -> save) workflow. Now there's one last detail before we can look at code snippets that do this. TensorFlow also has the capability to read data in the TFRecord format very efficiently.
Use Pandas For data that fits into RAM, pandas can often be faster and easier to use than Dask DataFrame. While “Big Data” tools can be exciting, they are almost always worse than normal data tools while those remain appropriate.
Many people use Dask alongside GPU-accelerated libraries like PyTorch and TensorFlow to manage workloads across several machines. They typically use Dask's custom APIs, notably Delayed and Futures. Dask doesn't need to know that these functions use GPUs. It just runs Python functions.
I found the answer. It is an issue with keras or tensorflow: https://github.com/keras-team/keras/issues/2397
Below code worked and using dask shaved 50% from the time versus standard pandas groupby.
#dask
model=keras.models.load_model('/home/embedding_model.h5')
#this part
import tensorflow as tf
global graph
graph = tf.get_default_graph()
def calc_HR_ind_dsk(grp):
topk=10
x=[grp['user'].values,grp['item'].values]
with graph.as_default(): #and this part from https://github.com/keras-team/keras/issues/2397
pred_act=list(zip(model.predict(x)[:,0],grp['respond'].values))
top=sorted(pred_act, key=lambda x: -x[0])[0:topk]
hit=sum([x[1] for x in top])
return(hit)
import dask.dataframe as dd
df = dd.read_csv('/home/test_coded_final.csv',dtype='int64')
results=df.groupby('user').apply(calc_HR_ind_dsk).compute()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With