Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Add PySpark RDD as new column to pyspark.sql.dataframe

I have a pyspark.sql.dataframe where each row is a news article. I then have a RDD that represents the words contained in each article. I want to add the RDD of words as a column named 'words' to my dataframe of new articles. I tried

df.withColumn('words', words_rdd )

but I get the error

AssertionError: col should be Column

The DataFrame looks something like this

Articles
the cat and dog ran
we went to the park
today it will rain

but I have 3k news articles.

I applied a function to clean the text such as remove stop words and I have a RDD that looks like this:

[[cat, dog, ran],[we, went, park],[today, will, rain]]

I'm trying to get my Dataframe to look like this:

Articles                 Words
the cat and dog ran      [cat, dog, ran]
we went to the park      [we, went, park]
today it will rain       [today, will, rain]
like image 571
jakko Avatar asked Feb 08 '17 21:02

jakko


3 Answers

Disclaimer:

Spark DataFrame in general has no strictly defined order. Use at your own risk.

Add index to existing DataFrame:

from pyspark.sql.types import *

df_index = spark.createDataFrame(
    df.rdd.zipWithIndex(),
    StructType([StructField("data", df.schema), StructField("id", LongType())])
)

Add index to RDD and convert to DataFrame:

words_df = spark.createDataFrame(
    words_rdd.zipWithIndex(),
    StructType([
        StructField("words", ArrayType(StringType())),
        StructField("id", LongType())
    ])
)

Join both and select required fields:

df_index.join(words_df, "id").select("data.*", "words")

Caution

There are different solutions, which might work in specific cases, but don't guarantee performance and or correctness. These include:

  • Using monotonically_increasing_id as a join key - in general case not correct.
  • Using row_number() window function as a join key - unacceptable performance implication and in general not correct if there is no specific order defined.
  • Using zip on RDDs - can work if and only if both structures have the same data distribution (should work in this case).

Note:

In this specific case you shouldn't need RDD. pyspark.ml.feature provides a variety of Transformers, which should work well for you.

from pyspark.ml.feature import *
from pyspark.ml import Pipeline

df = spark.createDataFrame(
     ["the cat and dog ran", "we went to the park", "today it will rain"],
         "string"
).toDF("Articles")

Pipeline(stages=[
    RegexTokenizer(inputCol="Articles", outputCol="Tokens"), 
    StopWordsRemover(inputCol="Tokens", outputCol="Words")
]).fit(df).transform(df).show()
# +-------------------+--------------------+---------------+
# |           Articles|              Tokens|          Words|
# +-------------------+--------------------+---------------+
# |the cat and dog ran|[the, cat, and, d...|[cat, dog, ran]|
# |we went to the park|[we, went, to, th...|   [went, park]|
# | today it will rain|[today, it, will,...|  [today, rain]|
# +-------------------+--------------------+---------------+

The list of stop words can be provided using stopWords parameter of the StopWordsRemover, for example:

StopWordsRemover(
    inputCol="Tokens",
    outputCol="Words",
    stopWords=["the", "and", "we", "to", "it"]
)
like image 89
Alper t. Turker Avatar answered Oct 23 '22 08:10

Alper t. Turker


Why do you want to join the rdd back to the dataframe, I would rather create a new column from "Articles" directly. There are multiple ways to do it, here are my 5 cents:

from pyspark.sql import Row
from pyspark.sql.context import SQLContext
sqlCtx = SQLContext(sc)    # sc is the sparkcontext

x = [Row(Articles='the cat and dog ran'),Row(Articles='we went to the park'),Row(Articles='today it will rain')]
df = sqlCtx.createDataFrame(x)

df2 = df.map(lambda x:tuple([x.Articles,x.Articles.split(' ')])).toDF(['Articles','words'])
df2.show()

You get the following output:

Articles                 words
the cat and dog ran      [the, cat, and, dog, ran]
we went to the park      [we, went, to, the, park]
today it will rain       [today, it, will, rain]

Let me know if you were looking to achieve something else.

like image 45
Gaurav Dhama Avatar answered Oct 23 '22 10:10

Gaurav Dhama


A simple approach but effective would be to use udf. You can:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

df = spark.createDataFrame(["the cat and dog ran", "we went to the park", "today it will rain", None], 
"string" ).toDF("Articles")

split_words = udf(lambda x : x.split(' ') if x is not None else x, StringType())
df = df.withColumn('Words', split_words(df['Articles']))

df.show(10,False)
>>
+-------------------+-------------------------+
|Articles           |Words                    |
+-------------------+-------------------------+
|the cat and dog ran|[the, cat, and, dog, ran]|
|we went to the park|[we, went, to, the, park]|
|today it will rain |[today, it, will, rain]  |
|null               |null                     |
+-------------------+-------------------------+

I added check for None because it very usual to have in your data bad lines. You can drop them easily after splitting or before ,with dropna.

But in my opinion if you want to do this as a preparation task for Text analytics it would be probably to your best interest to build a Pipeline as @user9613318 suggests in his answer

like image 2
Michail N Avatar answered Oct 23 '22 08:10

Michail N