Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Efficient text preprocessing using PySpark (clean, tokenize, stopwords, stemming, filter)

Recently, I began to learn the spark on the book "Learning Spark". In theory, everything is clear, in practice, I was faced with the fact that I first need to preprocess the text, but there were no actual tips on this topic.

The first thing that I took into account is that it is now preferable to use Dataframe instead of RDD, so my preprocessing attempt was made on dataframes.

Required operations:

  1. Clearing text from punctuation (regexp_replace)
  2. Tokenization (Tokenizer)
  3. Delete stop words (StopWordsRemover)
  4. Stematization (SnowballStemmer)
  5. Filtering short words (udf)

My code is:

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, lower, regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from nltk.stem.snowball import SnowballStemmer

spark = SparkSession.builder \
    .config("spark.executor.memory", "3g") \
    .config("spark.driver.cores", "4") \
    .getOrCreate()
df = spark.read.json('datasets/entitiesFull/full').select('id', 'text')

# Clean text
df_clean = df.select('id', (lower(regexp_replace('text', "[^a-zA-Z\\s]", "")).alias('text')))

# Tokenize text
tokenizer = Tokenizer(inputCol='text', outputCol='words_token')
df_words_token = tokenizer.transform(df_clean).select('id', 'words_token')

# Remove stop words
remover = StopWordsRemover(inputCol='words_token', outputCol='words_clean')
df_words_no_stopw = remover.transform(df_words_token).select('id', 'words_clean')

# Stem text
stemmer = SnowballStemmer(language='english')
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
df_stemmed = df_words_no_stopw.withColumn("words_stemmed", stemmer_udf("words_clean")).select('id', 'words_stemmed')

# Filter length word > 3
filter_length_udf = udf(lambda row: [x for x in row if len(x) >= 3], ArrayType(StringType()))
df_final_words = df_stemmed.withColumn('words', filter_length_udf(col('words_stemmed')))

Processing takes a very long time, the size of the entire document is 60 GB. Does it make sense to use RDD? Will caching help? How can I optimize preprocessing?

First I tested the implementation on the local computer, then I will try on the cluster. Local computer - Ubuntu RAM 6Gb, 4 CPU. Any alternative solution is also welcome. Thanks!

like image 911
vndywarhol Avatar asked Dec 02 '18 10:12

vndywarhol


1 Answers

JSON is typically the worst file format for Spark analysis, especially if it's a single 60GB JSON file. Spark works well with 1GB Parquet files. A little pre-processing will help a lot:

temp_df = spark.read.json('datasets/entitiesFull/full').select('id', 'text').repartition(60)
temp_df.write.parquet('some/other/path')
df = spark.read.parquet('some/other/path')
# ... continue the rest of the analysis

Wrapping the SnowballStemmer in a UDF isn't the best from a performance perspective, but the most realistic unless you're comfortable writing algos in low level Java bytecode. I created a Porter Stemming algo in ceja using a UDF as well.

Here's an example of a native implementation of a Spark function. It's possible to implement, but not easy.

like image 125
Powers Avatar answered Oct 15 '22 14:10

Powers