I am trying the word count problem in spark using python. But I am facing the problem when I try to save the output RDD in a text file using .saveAsTextFile command. Here is my code. Please help me. I am stuck. Appreciate for your time.
import re
from pyspark import SparkConf , SparkContext
def normalizewords(text):
return re.compile(r'\W+',re.UNICODE).split(text.lower())
conf=SparkConf().setMaster("local[2]").setAppName("sorted result")
sc=SparkContext(conf=conf)
input=sc.textFile("file:///home/cloudera/PythonTask/sample.txt")
words=input.flatMap(normalizewords)
wordsCount=words.map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)
sortedwordsCount=wordsCount.map(lambda (x,y):(y,x)).sortByKey()
results=sortedwordsCount.collect()
for result in results:
count=str(result[0])
word=result[1].encode('ascii','ignore')
if(word):
print word +"\t\t"+ count
results.saveAsTextFile("/var/www/myoutput")
Text file RDDs can be created using SparkContext 's textFile method. This method takes a URI for the file (either a local path on the machine, or a hdfs:// , s3a:// , etc URI) and reads it as a collection of lines. Here is an example invocation: JavaRDD<String> distFile = sc.
You can save the RDD using saveAsObjectFile and saveAsTextFile method. Whereas you can read the RDD using textFile and sequenceFile function from SparkContext.
To print RDD contents, we can use RDD collect action or RDD foreach action. RDD. collect() returns all the elements of the dataset as an array at the driver program, and using for loop on this array, we can print elements of RDD. RDD foreach(f) runs a function f on each element of the dataset.
Saving the text files: Spark consists of a function called saveAsTextFile(), which saves the path of a file and writes the content of the RDD to that file. The path is considered as a directory, and multiple outputs will be produced in that directory.
since you collected results=sortedwordsCount.collect()
so, its not RDD. It will be normal python list or tuple.
As you know list
is python object/data structure and append
is method to add element.
>>> x = []
>>> x.append(5)
>>> x
[5]
Similarly
RDD
is sparks object/data structure andsaveAsTextFile
is method to write the file. Important thing is its distributed data structure.
So, we cannot use append
on RDD or saveAsTextFile
on list. collect
is method on RDD to get to RDD to driver memory.
As mentioned in comments, save sortedwordsCount
with saveAsTextFile or open file in python and use results
to write in a file
Change results=sortedwordsCount.collect()
to results=sortedwordsCount
, because using .collect()
results will be a list.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With