Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In Spark ML, why is fitting a StringIndexer on a column with million of disctinct values yielding an OOM error?

I am trying to use Spark's StringIndexer feature transformer on a column with about 15.000.000 unique string values. Regardless of how many resources I throw at it, Spark always dies on me with some sort of Out Of Memory exception.

from pyspark.ml.feature import StringIndexer

data = spark.read.parquet("s3://example/data-raw").select("user", "count")

user_indexer = StringIndexer(inputCol="user", outputCol="user_idx")

indexer_model = user_indexer.fit(data) # This never finishes

indexer_model \
    .transform(data) \
    .write.parquet("s3://example/data-indexed")

An error file is produced on the driver, with the begining of it looking like this:

#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 268435456 bytes for committing reserved memory.
# Possible reasons:
#   The system is out of physical RAM or swap space
#   In 32 bit mode, the process size limit was hit
# Possible solutions:
#   Reduce memory load on the system
#   Increase physical memory or swap space
#   Check if swap backing store is full
#   Use 64 bit Java on a 64 bit OS
#   Decrease Java heap size (-Xmx/-Xms)
#   Decrease number of Java threads
#   Decrease Java thread stack sizes (-Xss)
#   Set larger code cache with -XX:ReservedCodeCacheSize=
# This output file may be truncated or incomplete.
#
#  Out of Memory Error (os_linux.cpp:2657)

Now, if I try to manually index the values and store them in a dataframe, everything works like charm, all on couple of Amazon c3.2xlarge workers.

from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

data = spark.read.parquet("s3://example/data-raw").select("user", "count")

uid_map = data \
    .select("user") \
    .distinct() \
    .select("user", row_number().over(Window.orderBy("user")).alias("user_idx"))

data.join(uid_map, "user", "inner").write.parquet("s3://example/data-indexed")

I would really like to use the formal transformers provided by Spark, but at this time this doesn't seem possible. Any ideas of how I can make this work?

like image 549
Interfector Avatar asked Aug 24 '18 08:08

Interfector


People also ask

What does Stringindexer do in PySpark?

A label indexer that maps a string column of labels to an ML column of label indices. If the input column is numeric, we cast it to string and index the string values. The indices are in [0, numLabels).

What does Vector indexer do?

This helps process a dataset of unknown vectors into a dataset with some continuous features and some categorical features. The choice between continuous and categorical is based upon a maxCategories parameter. Set maxCategories to the maximum number of categorical any categorical feature should have.

What is transform in PySpark?

PySpark RDD Transformations are lazy evaluation and is used to transform/update from one RDD into another. When executed on RDD, it results in a single or multiple new RDD.

What is spark ML?

The Apache Spark machine learning library (MLlib) allows data scientists to focus on their data problems and models instead of solving the complexities surrounding distributed data (such as infrastructure, configurations, and so on). In this tutorial module, you will learn how to: Load sample data.


1 Answers

The reason why you get an OOM error is that behind the curtains, Spark's StringIndexer calls countByValue on the "user" column to get all the distinct values.

With 15M distinct values, you are actually creating a huge Map on the driver and it runs out of memory... A straightforward workaround would be to increase the memory of the driver. If you use spark-submit you can use --driver-memory 16g. You can also use the spark.driver.memory property in the config file.

Yet, the problem will simply occur again as the number of distinct values increases. Unfortunately, there is not much you can do with Spark's transformers and here is why. Actually, after being fit to the data, the transformers are meant to be serialized for further use. Therefore they are not designed to be this big (a map with 15M strings would at the very least weigh 100MB). I think that you need to reconsider the use of a StringIndexer for that many categories. Using a Hashing trick would perhaps be a better fit here.

Finally, let me comment on your workaround. With your window, you actually put all your 15M categories on one partition and thus on one executor. It won't scale if that number increases. Also, using a non partitioned window is generally a bad idea since it prevents parallel computations (in addition to putting everything on the same partition which can cause an OOM error). I would compute your uid_map like this:

# if you don't need consecutive indices
uid_map = data\
    .select("user")\
    .distinct()\
    .withColumn("user_idx", monotonically_increasing_id())

# if you do, you need to use RDDs
uid_rdd = data\
    .select("user")\
    .distinct()\
    .rdd.map(lambda x : x["user"])\
    .zipWithIndex()
uid_map = spark.createDataFrame(uid_rdd, ["user", "user_idx"])
like image 152
Oli Avatar answered Oct 01 '22 04:10

Oli