Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Performance decrease for huge amount of columns. Pyspark

I met problem with processing of spark wide dataframe (about 9000 columns and sometimes more).
Task:

  1. Create wide DF via groupBy and pivot.
  2. Transform columns to vector and processing in to KMeans from pyspark.ml.

So I made extensive frame and try to create vector with VectorAssembler, cached it and trained on it KMeans.
It took about 11 minutes for assembling and 2 minutes for KMeans for 7 different count of clusters on my pc in standalone mode for frame ~500x9000. Another side this processing in pandas (pivot df, and iterate 7 clusters) takes less one minute.
Obviously I understand overhead and performance decreasing for standalone mode and caching and so on but it's really discourages me.
Could somebody explain how I can avoid this overhead?
How peoples work with wide DF instead of using vectorassembler and getting performance decreasing?
More formal question (for sof rules) sound like - How can I speed up this code?

%%time
tmp = (df_states.select('ObjectPath', 'User', 'PropertyFlagValue')
       .groupBy('User')
       .pivot('ObjectPath')
       .agg({'PropertyFlagValue':'max'})
       .fillna(0))
ignore = ['User']
assembler = VectorAssembler(
    inputCols=[x for x in tmp.columns if x not in ignore],
    outputCol='features')
Wall time: 36.7 s

print(tmp.count(), len(tmp.columns))
552, 9378

%%time
transformed = assembler.transform(tmp).select('User', 'features').cache()
Wall time: 10min 45s

%%time
lst_levels = []
for num in range(3, 14):
    kmeans = KMeans(k=num, maxIter=50)
    model = kmeans.fit(transformed)
    lst_levels.append(model.computeCost(transformed))
rs = [i-j for i,j in list(zip(lst_levels, lst_levels[1:]))]
for i, j in zip(rs, rs[1:]):
    if i - j < j:
        print(rs.index(i))
        kmeans = KMeans(k=rs.index(i) + 3, maxIter=50)
        model = kmeans.fit(transformed)
        break
 Wall time: 1min 32s

Config:

.config("spark.sql.pivotMaxValues", "100000") \
.config("spark.sql.autoBroadcastJoinThreshold", "-1") \
.config("spark.sql.shuffle.partitions", "4") \
.config("spark.sql.inMemoryColumnarStorage.batchSize", "1000") \
like image 559
Anton Alekseev Avatar asked Feb 20 '18 08:02

Anton Alekseev


People also ask

Why is my PySpark so slow?

Sometimes, Spark runs slowly because there are too many concurrent tasks running. The capacity for high concurrency is a beneficial feature, as it provides Spark-native fine-grained sharing. This leads to maximum resource utilization while cutting down query latencies.


1 Answers

Actually solution was found in map for rdd.

  1. First of all we going to create map of values.
  2. Also extract all distinct names.
  3. Penultimate step we are searching each value of rows' map in dict of names and return value or 0 if nothing was found.
  4. Vector assembler on results.

Advantages:

  1. You haven't to create wide dataframe with a lot of columns count and hence avoid overhead. (Speed was risen up from 11 minutes to 1.)
  2. You still work on cluster and execute you code in paradigm of spark.

Example of code: scala implementation.

like image 84
Anton Alekseev Avatar answered Oct 24 '22 16:10

Anton Alekseev