Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How i can maintain a temporary dictionary in a pyspark application?

I want to use pretrained embedding model (fasttext) in a pyspark application.

So if I broadcast the file (.bin), the following exception is thrown: Traceback (most recent call last):

cPickle.PicklingError: Could not serialize broadcast: OverflowError: cannot serialize a string larger than 2 GiB

Instead, I tried to use sc.addFile(modelpath) where modelpath=path/to/model.bin as following:

i create a file called fasttextSpark.py

import gensim
from gensim.models.fasttext import FastText as FT_gensim
# Load model (loads when this library is being imported)
model = FT_gensim.load_fasttext_format("/project/6008168/bib/wiki.en.bin")

# This is the function we use in UDF to predict the language of a given msg
def get_vector(msg):
    pred = model[msg]
    return pred

and testSubmit.sh:

#!/bin/bash
#SBATCH -N 2
#SBATCH -t 00:10:00
#SBATCH --mem 20000
#SBATCH --ntasks-per-node 1
#SBATCH --cpus-per-task 32
module load python/2.7.14
source "/project/6008168/bib/ENV2.7.14/bin/activate"
module load spark/2.3.0
spark-submit /project/6008168/bib/test.py

and the test.py:

from __future__ import print_function
import sys
import time
import math
import csv
import datetime
import StringIO
import pyspark
import gensim
from operator import add
from pyspark.sql import *
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from gensim.models.fasttext import FastText as FT_gensim
appName = "bib"
modelpath = "/project/6008168/bib/wiki.en.bin"
conf = (SparkConf()
         .setAppName(appName)
         .set("spark.executor.memory", "12G")
         .set("spark.network.timeout", "800s")
         .set("spark.executor.heartbeatInterval", "20s")
         .set("spark.driver.maxResultSize", "12g")
         .set("spark.executor.instances", 2)
         .set("spark.executor.cores", 30)
         )
sc = SparkContext(conf = conf)
#model = FT_gensim.load_fasttext_format(modelpath)
sc.addFile(modelpath)
sc.addPyFile("/project/6008168/bib/fasttextSpark.py")

# Import our custom fastText language classifier lib
import fasttextSpark
print ("nights = ", fasttextSpark.get_vector("nights"))
print ("done")

Now, each node will have a copy of the pretrained dataset. Some words are out of vocabulary so each time I'm facing such words I want to create for it a random but fixed vector and add the word and its vector to a dictionary.

So, How I can maintain such a dictionary in each node?

Indeed, suppose my rdd is as following my_rdd = (id, sentence) and I want to find the embedding vector of the sentence by summing up the vectors of its words. How many times the embedding model will be loaded. For example:

suppose rdd=("id1", "motorcycle parts"), does my implementation load the model two times: one for motorcycle and one for parts? if yes, my approach is inefficacce? In this case what it should be the best approaches to be applied?

like image 471
bib Avatar asked Nov 06 '22 21:11

bib


1 Answers

Module variables in Python are evaluated once, when the module is loaded. So the variable will be loaded once per interpreter and kept alive as long as the interpreter is kept alive.

However Spark worker processes don't share memory, so there will be one copy of the dictionary per worker process. The same would be true if you had a broadcast variable.

So your current solution is as close to what you want as you can get, without using low level primitives (like memory mapping) or external storage.

like image 82
user11001748 Avatar answered Nov 14 '22 06:11

user11001748