I have a pyspark.sql.dataframe where each row is a news article. I then have a RDD that represents the words contained in each article. I want to add the RDD of words as a column named 'words' to my dataframe of new articles. I tried
df.withColumn('words', words_rdd )
but I get the error
AssertionError: col should be Column
The DataFrame looks something like this
Articles
the cat and dog ran
we went to the park
today it will rain
but I have 3k news articles.
I applied a function to clean the text such as remove stop words and I have a RDD that looks like this:
[[cat, dog, ran],[we, went, park],[today, will, rain]]
I'm trying to get my Dataframe to look like this:
Articles Words
the cat and dog ran [cat, dog, ran]
we went to the park [we, went, park]
today it will rain [today, will, rain]
Disclaimer:
Spark DataFrame
in general has no strictly defined order. Use at your own risk.
Add index to existing DataFrame
:
from pyspark.sql.types import *
df_index = spark.createDataFrame(
df.rdd.zipWithIndex(),
StructType([StructField("data", df.schema), StructField("id", LongType())])
)
Add index to RDD
and convert to DataFrame
:
words_df = spark.createDataFrame(
words_rdd.zipWithIndex(),
StructType([
StructField("words", ArrayType(StringType())),
StructField("id", LongType())
])
)
Join both and select required fields:
df_index.join(words_df, "id").select("data.*", "words")
Caution
There are different solutions, which might work in specific cases, but don't guarantee performance and or correctness. These include:
monotonically_increasing_id
as a join
key - in general case not correct.row_number()
window function as a join key - unacceptable performance implication and in general not correct if there is no specific order defined.zip
on RDDs
- can work if and only if both structures have the same data distribution (should work in this case).Note:
In this specific case you shouldn't need RDD
. pyspark.ml.feature
provides a variety of Transformers
, which should work well for you.
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
df = spark.createDataFrame(
["the cat and dog ran", "we went to the park", "today it will rain"],
"string"
).toDF("Articles")
Pipeline(stages=[
RegexTokenizer(inputCol="Articles", outputCol="Tokens"),
StopWordsRemover(inputCol="Tokens", outputCol="Words")
]).fit(df).transform(df).show()
# +-------------------+--------------------+---------------+
# | Articles| Tokens| Words|
# +-------------------+--------------------+---------------+
# |the cat and dog ran|[the, cat, and, d...|[cat, dog, ran]|
# |we went to the park|[we, went, to, th...| [went, park]|
# | today it will rain|[today, it, will,...| [today, rain]|
# +-------------------+--------------------+---------------+
The list of stop words can be provided using stopWords
parameter of the StopWordsRemover
, for example:
StopWordsRemover(
inputCol="Tokens",
outputCol="Words",
stopWords=["the", "and", "we", "to", "it"]
)
Why do you want to join the rdd back to the dataframe, I would rather create a new column from "Articles" directly. There are multiple ways to do it, here are my 5 cents:
from pyspark.sql import Row
from pyspark.sql.context import SQLContext
sqlCtx = SQLContext(sc) # sc is the sparkcontext
x = [Row(Articles='the cat and dog ran'),Row(Articles='we went to the park'),Row(Articles='today it will rain')]
df = sqlCtx.createDataFrame(x)
df2 = df.map(lambda x:tuple([x.Articles,x.Articles.split(' ')])).toDF(['Articles','words'])
df2.show()
You get the following output:
Articles words
the cat and dog ran [the, cat, and, dog, ran]
we went to the park [we, went, to, the, park]
today it will rain [today, it, will, rain]
Let me know if you were looking to achieve something else.
A simple approach but effective would be to use udf. You can:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
df = spark.createDataFrame(["the cat and dog ran", "we went to the park", "today it will rain", None],
"string" ).toDF("Articles")
split_words = udf(lambda x : x.split(' ') if x is not None else x, StringType())
df = df.withColumn('Words', split_words(df['Articles']))
df.show(10,False)
>>
+-------------------+-------------------------+
|Articles |Words |
+-------------------+-------------------------+
|the cat and dog ran|[the, cat, and, dog, ran]|
|we went to the park|[we, went, to, the, park]|
|today it will rain |[today, it, will, rain] |
|null |null |
+-------------------+-------------------------+
I added check for None because it very usual to have in your data bad lines. You can drop them easily after splitting or before ,with dropna.
But in my opinion if you want to do this as a preparation task for Text analytics it would be probably to your best interest to build a Pipeline as @user9613318 suggests in his answer
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With