I want to make sentiment analysis using Kafka and Spark. What I want to do is read Streaming Data from Kafka and then using Spark to batch the data. After that, I want to analyze the batch using function sentimentPredict() that I have maked using Tensorflow. This is what i have do so far ...
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
# Spark
from pyspark import SparkContext
# Spark Streaming
from pyspark.streaming import StreamingContext
# Kafka
from pyspark.streaming.kafka import KafkaUtils
# json parsing
import json
from multiprocessing import Lock
lock = Lock()
numDimensions = 300
maxSeqLength = 70
batchSize = 24
lstmUnits = 128
numClasses = 2
iterations = 100000
import numpy as np
import pickle
from nltk.tokenize import word_tokenize
import DataPreprocessing as proc
import time
with open('dictionary.pickle', 'rb') as handle:
wordsList = pickle.load(handle)
wordVectors = np.load('final_embeddings.npy')
import tensorflow as tf
tf.reset_default_graph()
labels = tf.placeholder(tf.float32, [batchSize, numClasses])
input_data = tf.placeholder(tf.int32, [batchSize, maxSeqLength])
data = tf.Variable(tf.zeros([batchSize, maxSeqLength, numDimensions]),dtype=tf.float32)
data = tf.nn.embedding_lookup(wordVectors,input_data)
lstmCell = tf.contrib.rnn.BasicLSTMCell(lstmUnits)
lstmCell = tf.contrib.rnn.DropoutWrapper(cell=lstmCell, output_keep_prob=0.25)
value, _ = tf.nn.dynamic_rnn(lstmCell, data, dtype=tf.float32)
weight = tf.Variable(tf.truncated_normal([lstmUnits, numClasses]))
bias = tf.Variable(tf.constant(0.1, shape=[numClasses]))
value = tf.transpose(value, [1, 0, 2])
last = tf.gather(value, int(value.get_shape()[0]) - 1)
prediction = (tf.matmul(last, weight) + bias)
correctPred = tf.equal(tf.argmax(prediction,1), tf.argmax(labels,1))
accuracy = tf.reduce_mean(tf.cast(correctPred, tf.float32))
sess = tf.InteractiveSession()
saver = tf.train.Saver()
saver.restore(sess, tf.train.latest_checkpoint('models'))
def getSentenceMatrix(sentence):
arr = np.zeros([batchSize, maxSeqLength])
sentenceMatrix = np.zeros([batchSize,maxSeqLength], dtype='int32')
cleanedSentence = proc.cleanSentences(sentence)
split = cleanedSentence.split()
for indexCounter,word in enumerate(split):
try:
if word in wordsList:
sentenceMatrix[0,indexCounter] = wordsList[word]
else:
sentenceMatrix[0,indexCounter] = 0 #Vector for unkown words
except ValueError:
sentenceMatrix[0,indexCounter] = 399999 #Vector for unkown words
return sentenceMatrix
def sentimentCorrect(data):
try:
sentiment_results = {}
#sentences = data['sentences']
string = data.split(' ')
exact = [(spell.correction(word)) for word in string]
exact = ' '.join(exact)
inputMatrix = getSentenceMatrix(proc.cleanSentences(proc._lookup_words(proc.stemmer.stem(exact))))
predictedSentiment = sess.run(prediction, {input_data: inputMatrix})[0]
# predictedSentiment[0] represents output score for positive sentiment
# predictedSentiment[1] represents output score for negative sentiment
print("Positive : ",predictedSentiment[0])
print("Negative : ",predictedSentiment[1])
if (predictedSentiment[0] > predictedSentiment[1]):
result = "Positive"
else:
result = "Negative"
sentiment_results["sentences"] = data
sentiment_results["positiveScores"] = str(predictedSentiment[0])
sentiment_results["negativeScores"] = str(predictedSentiment[1])
sentiment_results["sentiment"] = result
return sentiment_results
except:
print("Delay for 5 seconds")
time.sleep(5)
def sentimentPredict(data):
try:
sentiment_results = {}
#sentences = data['sentences']
#string = sentences.split(' ')
#exact = [get_exact_words(word) for word in string]
#exact = ' '.join(exact)
inputMatrix = getSentenceMatrix(proc.cleanSentences(proc._lookup_words(proc.stemmer.stem(data))))
predictedSentiment = sess.run(prediction, {input_data: inputMatrix})[0]
# predictedSentiment[0] represents output score for positive sentiment
# predictedSentiment[1] represents output score for negative sentiment
print("Positive : ",predictedSentiment[0])
print("Negative : ",predictedSentiment[1])
if (predictedSentiment[0] > predictedSentiment[1]):
result = "Positive"
else:
result = "Negative"
sentiment_results["sentences"] = data
sentiment_results["positiveScores"] = str(predictedSentiment[0])
sentiment_results["negativeScores"] = str(predictedSentiment[1])
sentiment_results["sentiment"] = result
return sentiment_results
except TypeError:
raise
sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 2)
#kafkaStream = KafkaUtils.createStream(ssc, 'NLP:2181', 'spark-streaming', {'weblogs':1})
kafkaStream = KafkaUtils.createDirectStream(ssc, topics = ['weblogs'], kafkaParams = {"metadata.broker.list": "NLP:9092"})
# Here to parse the inbound messages isn't valid JSON
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
#parsed.saveAsTextFiles("file:///D:/spark-kafka.txt")
id_twitter = parsed.map(lambda tweet: tweet["id"])
id_twitter.saveAsTextFiles("file:///D:/id-tweet.txt")
id_twitter.count().map(lambda x:'ID in this batch: %s' % x).pprint()
name = parsed.map(lambda tweet: tweet["name"])
name.saveAsTextFiles("file:///D:/name-tweet.txt")
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
text = parsed.map(lambda tweet: tweet["text"])
text.saveAsTextFiles("file:///D:/text-tweet.txt")
sentiment = text.mapPartitions(sentimentPredict)
sentiment.saveAsTextFiles("file:///D:/sentiment-tweet.txt")
#sentiment_result = text.map(sentimentPredict(text))
#sentiment_result = text.flatMap(sentimentPredict(text))
#print(sentiment_result)
#parsed.map(lambda x:'Tweets in this batch: %s' % x).pprint()
#parsed.encode("utf-8").pprint()
#print(parsed)
#print(soup.encode("utf-8"))
#sentiment_result.saveAsTextFiles("file:///D:/spark-kafka.txt")
ssc.start()
ssc.awaitTermination()
But, when I ran my code using spark-submit2 in terminal, i got this error ...
Traceback (most recent call last):
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 235, in dump
return Pickler.dump(self, obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 409, in dump
self.save(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 751, in save_tuple
save(element)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
self._batch_appends(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 805, in _batch_appends
save(x)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
self._batch_appends(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 808, in _batch_appends
save(tmp[0])
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 372, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 525, in save_function_tuple
save(f_globals)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
save(state)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
save(state)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 496, in save
rv = reduce(self.proto)
TypeError: can't pickle _thread.RLock objects
2018-04-09 16:21:48 ERROR JobScheduler:91 - Error generating jobs for time 1523265708000 ms
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 235, in dump
return Pickler.dump(self, obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 409, in dump
self.save(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 751, in save_tuple
save(element)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
self._batch_appends(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 805, in _batch_appends
save(x)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
self._batch_appends(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 808, in _batch_appends
save(tmp[0])
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 372, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 525, in save_function_tuple
save(f_globals)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
save(state)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
save(state)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 496, in save
rv = reduce(self.proto)
TypeError: can't pickle _thread.RLock objects
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\streaming\util.py", line 67, in call
return r._jrdd
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2470, in _jrdd
self._jrdd_deserializer, profiler)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2403, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2389, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\serializers.py", line 568, in dumps
return cloudpickle.dumps(obj, 2)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 918, in dumps
cp.dump(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 249, in dump
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Traceback (most recent call last):
File "D:/PROJECT_MABESPOLRI/progress_spark_sentiment/spark+sentiment.py", line 171, in <module>
ssc.awaitTermination()
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\streaming\context.py", line 206, in awaitTermination
self._jssc.awaitTermination()
File "C:\Users\CS\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 1160, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\Users\CS\Anaconda3\lib\site-packages\py4j\protocol.py", line 320, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o22.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 235, in dump
return Pickler.dump(self, obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 409, in dump
self.save(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 751, in save_tuple
save(element)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
self._batch_appends(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 805, in _batch_appends
save(x)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
self._batch_appends(obj)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 808, in _batch_appends
save(tmp[0])
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 372, in save_function
self.save_function_tuple(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 525, in save_function_tuple
save(f_globals)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
save(state)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
save(state)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
save(v)
File "C:\Users\CS\Anaconda3\lib\pickle.py", line 496, in save
rv = reduce(self.proto)
TypeError: can't pickle _thread.RLock objects
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\streaming\util.py", line 67, in call
return r._jrdd
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2470, in _jrdd
self._jrdd_deserializer, profiler)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2403, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2389, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\serializers.py", line 568, in dumps
return cloudpickle.dumps(obj, 2)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 918, in dumps
cp.dump(obj)
File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 249, in dump
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Can someone give me a clue how to solving this problem ??? Thank you
Try to put spark code in a separate function with only necessary arguments. When you run spark operations it tries to pickle everything in the current scope (in your case on a top-level), if it encounters some object which can't be pickled it throws an error. In your case, I suspect the error is perhaps caused by variable "lock".
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With