Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How much copies of the environment does spark do?

I have a PySpark application that has to elaborate about 5gb of compressed data (strings). I'm using a small server with 12 cores (24 threads) and 72Gb on RAM. My PySpark program consist only in 2 map operations, helped by 3 very big regex (3gb each already compiled) and loaded with pickle. Spark is working in standalone mode with worker and master on the same machine.

My question is: does spark replicate each variable for each executor core? Because it uses all the memory available and then uses a lot of swap space. Or perhaps does it loads the all partitions in RAM? The RDD contains about 10 millions string that has to be searched by the 3 regex. The RDD counts about 1000 partitions. I've trouble finishing this task because after some minutes the memory is full and spark start using the swap space becoming very very slow. I've noticed that without the regex the situation is the same.

This is my code, it deletes all the useless fields of twitter's tweets and scan the tweet's texts and descriptions for particular words:

import json
import re
import twitter_util as twu
import pickle

from pyspark import SparkContext
sc = SparkContext()

prefix = '/home/lucadiliello'

source = prefix + '/data/tweets'
dest = prefix + '/data/complete_tweets'

#Regex's path
companies_names_regex = prefix + '/data/comp_names_regex'
companies_names_dict = prefix + '/data/comp_names_dict'
companies_names_dict_to_legal = prefix + '/data/comp_names_dict_to_legal'

#Loading the regex's
comp_regex = pickle.load(open(companies_names_regex))
comp_dict = pickle.load(open(companies_names_dict))
comp_dict_legal = pickle.load(open(companies_names_dict_to_legal))

#Loading the RDD from textfile 
tx = sc.textFile(source).map(lambda a: json.loads(a))


def get_device(input_text):
    output_text = re.sub('<[^>]*>', '', input_text)
    return output_text

def filter_data(a):
    res = {}
    try:
        res['mentions'] = a['entities']['user_mentions']
        res['hashtags'] = a['entities']['hashtags']
        res['created_at'] = a['created_at'] 
        res['id'] = a['id'] 

        res['lang'] = a['lang']
        if 'place' in a and a['place'] is not None:      
            res['place'] = {} 
            res['place']['country_code'] = a['place']['country_code'] 
            res['place']['place_type'] = a['place']['place_type'] 
            res['place']['name'] = a['place']['name'] 
            res['place']['full_name'] = a['place']['full_name']

        res['source'] = get_device(a['source'])
        res['text'] = a['text'] 
        res['timestamp_ms'] = a['timestamp_ms'] 

        res['user'] = {} 
        res['user']['created_at'] = a['user']['created_at'] 
        res['user']['description'] = a['user']['description'] 
        res['user']['followers_count'] = a['user']['followers_count'] 
        res['user']['friends_count'] = a['user']['friends_count']
        res['user']['screen_name'] = a['user']['screen_name']
        res['user']['lang'] = a['user']['lang']
        res['user']['name'] = a['user']['name']
        res['user']['location'] = a['user']['location']
        res['user']['statuses_count'] = a['user']['statuses_count']
        res['user']['verified'] = a['user']['verified']
        res['user']['url'] = a['user']['url']
    except KeyError:
        return []

    return [res]


results = tx.flatMap(filter_data)


def setting_tweet(tweet):

    text = tweet['text'] if tweet['text'] is not None else ''
    descr = tweet['user']['description'] if tweet['user']['description'] is not None else ''
    del tweet['text']
    del tweet['user']['description']

    tweet['text'] = {}
    tweet['user']['description'] = {}
    del tweet['mentions']

    #tweet
    tweet['text']['original_text'] = text
    tweet['text']['mentions'] = twu.find_retweet(text)
    tweet['text']['links'] = []
    for j in twu.find_links(text):
        tmp = {}
        try:
            tmp['host'] = twu.get_host(j)
            tmp['link'] = j
            tweet['text']['links'].append(tmp)
        except ValueError:
            pass

    tweet['text']['companies'] = []
    for x in comp_regex.findall(text.lower()):
        tmp = {}
        tmp['id'] = comp_dict[x.lower()]
        tmp['name'] = x
        tmp['legalName'] = comp_dict_legal[x.lower()]
        tweet['text']['companies'].append(tmp)

    # descr
    tweet['user']['description']['original_text'] = descr
    tweet['user']['description']['mentions'] = twu.find_retweet(descr)
    tweet['user']['description']['links'] = []
    for j in twu.find_links(descr):
        tmp = {}
        try:
            tmp['host'] = twu.get_host(j)
            tmp['link'] = j
            tweet['user']['description']['links'].append(tmp)
        except ValueError:
            pass

    tweet['user']['description']['companies'] = []
    for x in comp_regex.findall(descr.lower()):
        tmp = {}
        tmp['id'] = comp_dict[x.lower()]
        tmp['name'] = x
        tmp['legalName'] = comp_dict_legal[x.lower()]
        tweet['user']['description']['companies'].append(tmp)

    return tweet


res = results.map(setting_tweet)

res.map(lambda a: json.dumps(a)).saveAsTextFile(dest, compressionCodecClass="org.apache.hadoop.io.compress.BZip2Codec")

UPDATE After about 1 hour, the memory (72gb) is completely full and swap (72gb) too. Using broadcasting is not a solution in my case.

UPDATE 2 Without loading the 3 variables with pickle, it ends without problems using up to 10gb of RAM, instead of 144GB! (72gb RAM + 72Gb Swap)

like image 320
Luca Di Liello Avatar asked May 13 '17 16:05

Luca Di Liello


1 Answers

My question is: does spark replicate each variable for each executor core?

Yes!

The number of copies for each (local) variable is equal to the number of threads you assign to Python workers.


As for you problem, try loading comp_regex, comp_dict and comp_dict_legal without using pickle.

like image 77
gsamaras Avatar answered Oct 31 '22 20:10

gsamaras