Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to resolve : Very large size tasks in spark

Here I am pasting my python code which I am running on spark in order to perform some analysis on data. I am able to run the following program on small amount of data-set. But when coming large data-set, it is saying "Stage 1 contains a task of very large size (17693 KB). The maximum recommended task size is 100 KB".

import os
import sys
import unicodedata
from operator import add

try:
    from pyspark import SparkConf
    from pyspark import SparkContext
except ImportError as e:
    print ("Error importing Spark Modules", e)
    sys.exit(1)

def tokenize(text):
    resultDict = {}
    text = unicodedata.normalize('NFKD', text).encode('ascii','ignore')

    str1= text[1]
    str2= text[0]

    arrText= text.split(str1)

    ss1 = arrText[0].split("/")

    docID = ss1[0].strip()

    docName = ss[1].strip()

    resultDict[docID+"_"+docName] = 1

    return resultDict.iteritems()

sc=SparkContext('local')
textfile = sc.textFile("path to my data")
fileContent = textfile.flatMap(tokenize)
rdd = sc.parallelize(fileContent.collect())
rdd= rdd.map(lambda x: (x[0], x[1])).reduceByKey(add)
print rdd.collect()
#reduceByKey(lambda a,b: a+b)
rdd.coalesce(1).saveAsTextFile("path to result")

Here I am posting a little more warning: The job is not running after this. Can some one help me with this.

16/06/10 19:19:58 WARN TaskSetManager: Stage 1 contains a task of very large size (17693 KB). The maximum recommended task size is 100 KB.
16/06/10 19:19:58 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 5314, localhost, partition 0,PROCESS_LOCAL, 18118332 bytes)
16/06/10 19:19:58 INFO Executor: Running task 0.0 in stage 1.0 (TID 5314)
16/06/10 19:43:00 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:43480 in memory (size: 3.9 KB, free: 511.1 MB)
16/06/10 19:43:00 INFO ContextCleaner: Cleaned accumulator 2
like image 441
Baradwaj Aryasomayajula Avatar asked Jun 11 '16 02:06

Baradwaj Aryasomayajula


People also ask

How do I increase the number of tasks in Spark?

When using the spark-xml package, you can increase the number of tasks per stage by changing the configuration setting spark. hadoop. mapred.

How many partitions does a single task work on Spark?

For a task, it is one Partition in, one partition out. However, a repartitioning or shuffle/sort can happen in between tasks. The number of the Spark tasks equal to the number of the Spark partitions?

How many tasks does Spark run on each partition during a Spark job submit in a cluster?

By default, it is set to the total number of cores on all the executor nodes. Partitions in Spark do not span multiple machines. Tuples in the same partition are guaranteed to be on the same machine. Spark assigns one task per partition and each worker can process one task at a time.


1 Answers

When Spark serializes tasks, it recursively serializes the full closure context. In this case, the logical culprit seems to be unicodedata which you use in tokenize. I may be wrong but I don't see any other heavy data structures in the code. (Caveat, I typically use Spark with Scala and my Python is rusty.) I wonder whether the library is backed by heavy data structures that are not available on the executor nodes.

The typical patterns for dealing with these types of problems are:

  1. Make sure all libraries are available on the executor nodes.

  2. Use broadcast variables to distribute heavy data structures to executors.

Unrelated, unless you are using this as a debugging tool, you are doing much unnecessary collection of all the data back to the driver with collect. Transformation can be chained:

sc.textFile(...).flatMap(...).map(...).reduceByKey(add).coalesce(1).saveAsTextFile(...)
like image 83
Sim Avatar answered Sep 18 '22 11:09

Sim