Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

MatchError while accessing vector column in Spark 2.0

I am trying to create a LDA model on a JSON file.

Creating a spark context with the JSON file :

import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder
  .master("local")
  .appName("my-spark-app")
  .config("spark.some.config.option", "config-value")
  .getOrCreate()

 val df = spark.read.json("dbfs:/mnt/JSON6/JSON/sampleDoc.txt")

Displaying the df should show the DataFrame

display(df)

Tokenize the text

import org.apache.spark.ml.feature.RegexTokenizer

// Set params for RegexTokenizer
val tokenizer = new RegexTokenizer()
                .setPattern("[\\W_]+")
                .setMinTokenLength(4) // Filter away tokens with length < 4
                .setInputCol("text")
                .setOutputCol("tokens")

// Tokenize document
val tokenized_df = tokenizer.transform(df)

This should be displaying the tokenized_df

display(tokenized_df)

Get the stopwords

%sh wget http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words > -O /tmp/stopwords

Optional: copying the stopwords to the tmp folder

%fs cp file:/tmp/stopwords dbfs:/tmp/stopwords

Collecting all the stopwords

val stopwords = sc.textFile("/tmp/stopwords").collect()

Filtering out the stopwords

 import org.apache.spark.ml.feature.StopWordsRemover

 // Set params for StopWordsRemover
 val remover = new StopWordsRemover()
                   .setStopWords(stopwords) // This parameter is optional
                   .setInputCol("tokens")
                   .setOutputCol("filtered")

 // Create new DF with Stopwords removed
 val filtered_df = remover.transform(tokenized_df)

Displaying the filtered df should verify the stopwords got removed

 display(filtered_df)

Vectorizing the frequency of occurrence of words

 import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.sql.Row
 import org.apache.spark.ml.feature.CountVectorizer

 // Set params for CountVectorizer
 val vectorizer = new CountVectorizer()
               .setInputCol("filtered")
               .setOutputCol("features")
               .fit(filtered_df)

Verify the vectorizer

 vectorizer.transform(filtered_df)
           .select("id", "text","features","filtered").show()

After this I am seeing an issue in fitting this vectorizer in LDA. The issue which I believe is CountVectorizer is giving sparse vector but LDA requires dense vector. Still trying to figure out the issue.

Here is the exception where map is not able to convert.

import org.apache.spark.mllib.linalg.Vector
val ldaDF = countVectors.map { 
             case Row(id: String, countVector: Vector) => (id, countVector) 
            }
display(ldaDF)

Exception :

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4083.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4083.0 (TID 15331, 10.209.240.17): scala.MatchError: [0,(1252,[13,17,18,20,30,37,45,50,51,53,63,64,96,101,108,125,174,189,214,221,224,227,238,268,291,309,328,357,362,437,441,455,492,493,511,528,561,613,619,674,764,823,839,980,1098,1143],[1.0,1.0,2.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,2.0,1.0,5.0,1.0,2.0,2.0,1.0,4.0,1.0,2.0,3.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)

There is a working sample for LDA which is not throwing any issue

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}

val a = Vectors.dense(Array(1.0,2.0,3.0))
val b = Vectors.dense(Array(3.0,4.0,5.0))
val df = Seq((1L,a),(2L,b),(2L,a)).toDF

val ldaDF = df.map { case Row(id: Long, countVector: Vector) => (id, countVector) } 

val model = new LDA().setK(3).run(ldaDF.javaRDD)
display(df)

The only difference is in the second snippet we are having a dense matrix.

like image 538
Nabs Avatar asked Dec 14 '22 04:12

Nabs


2 Answers

This has nothing to do with sparsity. Since Spark 2.0.0 ML Transformers no longer generate o.a.s.mllib.linalg.VectorUDT but o.a.s.ml.linalg.VectorUDT and are mapped locally to subclasses of o.a.s.ml.linalg.Vector. These are not compatible with old MLLib API which is moving towards deprecation in Spark 2.0.0.

You can convert between to "old" using Vectors.fromML:

import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
import org.apache.spark.ml.linalg.{Vectors => NewVectors}

OldVectors.fromML(NewVectors.dense(1.0, 2.0, 3.0))
OldVectors.fromML(NewVectors.sparse(5, Seq(0 -> 1.0, 2 -> 2.0, 4 -> 3.0)))

but it make more sense to use ML implementation of LDA if you already use ML transformers.

For convenience you can use implicit conversions:

import scala.languageFeature.implicitConversions

object VectorConversions {
  import org.apache.spark.mllib.{linalg => mllib}
  import org.apache.spark.ml.{linalg => ml}

  implicit def toNewVector(v: mllib.Vector) = v.asML
  implicit def toOldVector(v: ml.Vector) = mllib.Vectors.fromML(v)
}
like image 57
zero323 Avatar answered Dec 21 '22 10:12

zero323


I changed:

val ldaDF = countVectors.map { 
             case Row(id: String, countVector: Vector) => (id, countVector) 
            }

to:

val ldaDF = countVectors.map { case Row(docId: String, features: MLVector) => 
                               (docId.toLong, Vectors.fromML(features)) }

And it worked like a charm! It is aligned with what @zero323 has written.

List of imports:

import org.apache.spark.ml.feature.{CountVectorizer, RegexTokenizer, StopWordsRemover}
import org.apache.spark.ml.linalg.{Vector => MLVector}
import org.apache.spark.mllib.clustering.{LDA, OnlineLDAOptimizer}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.{Row, SparkSession}
like image 22
AmirHd Avatar answered Dec 21 '22 12:12

AmirHd