Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects

I want to make sentiment analysis using Kafka and Spark. What I want to do is read Streaming Data from Kafka and then using Spark to batch the data. After that, I want to analyze the batch using function sentimentPredict() that I have maked using Tensorflow. This is what i have do so far ...

import os  
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'

#    Spark
from pyspark import SparkContext  
#    Spark Streaming
from pyspark.streaming import StreamingContext  
#    Kafka
from pyspark.streaming.kafka import KafkaUtils  
#    json parsing
import json

from multiprocessing import Lock
lock = Lock()

numDimensions = 300
maxSeqLength = 70
batchSize = 24
lstmUnits = 128
numClasses = 2
iterations = 100000

import numpy as np
import pickle
from nltk.tokenize import word_tokenize
import DataPreprocessing as proc
import time

with open('dictionary.pickle', 'rb') as handle:
    wordsList = pickle.load(handle)
wordVectors = np.load('final_embeddings.npy')

import tensorflow as tf
tf.reset_default_graph()

labels = tf.placeholder(tf.float32, [batchSize, numClasses])
input_data = tf.placeholder(tf.int32, [batchSize, maxSeqLength])

data = tf.Variable(tf.zeros([batchSize, maxSeqLength, numDimensions]),dtype=tf.float32)
data = tf.nn.embedding_lookup(wordVectors,input_data)

lstmCell = tf.contrib.rnn.BasicLSTMCell(lstmUnits)
lstmCell = tf.contrib.rnn.DropoutWrapper(cell=lstmCell, output_keep_prob=0.25)
value, _ = tf.nn.dynamic_rnn(lstmCell, data, dtype=tf.float32)

weight = tf.Variable(tf.truncated_normal([lstmUnits, numClasses]))
bias = tf.Variable(tf.constant(0.1, shape=[numClasses]))
value = tf.transpose(value, [1, 0, 2])
last = tf.gather(value, int(value.get_shape()[0]) - 1)
prediction = (tf.matmul(last, weight) + bias)

correctPred = tf.equal(tf.argmax(prediction,1), tf.argmax(labels,1))
accuracy = tf.reduce_mean(tf.cast(correctPred, tf.float32))

sess = tf.InteractiveSession()
saver = tf.train.Saver()
saver.restore(sess, tf.train.latest_checkpoint('models'))

def getSentenceMatrix(sentence):
    arr = np.zeros([batchSize, maxSeqLength])
    sentenceMatrix = np.zeros([batchSize,maxSeqLength], dtype='int32')
    cleanedSentence = proc.cleanSentences(sentence)
    split = cleanedSentence.split()
    for indexCounter,word in enumerate(split):
        try:
            if word in wordsList:
                    sentenceMatrix[0,indexCounter] = wordsList[word]
            else:
                    sentenceMatrix[0,indexCounter] = 0 #Vector for unkown words
        except ValueError:
            sentenceMatrix[0,indexCounter] = 399999 #Vector for unkown words
    return sentenceMatrix

def sentimentCorrect(data):
    try:
        sentiment_results = {}
        #sentences = data['sentences']
        string = data.split(' ')
        exact = [(spell.correction(word)) for word in string]
        exact = ' '.join(exact)
        inputMatrix = getSentenceMatrix(proc.cleanSentences(proc._lookup_words(proc.stemmer.stem(exact))))
        predictedSentiment = sess.run(prediction, {input_data: inputMatrix})[0]
        # predictedSentiment[0] represents output score for positive sentiment
        # predictedSentiment[1] represents output score for negative sentiment
        print("Positive : ",predictedSentiment[0])
        print("Negative : ",predictedSentiment[1])
        if (predictedSentiment[0] > predictedSentiment[1]):
            result = "Positive"
        else:
            result = "Negative"

        sentiment_results["sentences"] = data
        sentiment_results["positiveScores"] = str(predictedSentiment[0])
        sentiment_results["negativeScores"] = str(predictedSentiment[1])
        sentiment_results["sentiment"] = result

        return sentiment_results
    except:
        print("Delay for 5 seconds")
        time.sleep(5)

def sentimentPredict(data):
        try:
            sentiment_results = {}
            #sentences = data['sentences']
            #string = sentences.split(' ')
            #exact = [get_exact_words(word) for word in string]
            #exact = ' '.join(exact)
            inputMatrix = getSentenceMatrix(proc.cleanSentences(proc._lookup_words(proc.stemmer.stem(data))))
            predictedSentiment = sess.run(prediction, {input_data: inputMatrix})[0]
            # predictedSentiment[0] represents output score for positive sentiment
            # predictedSentiment[1] represents output score for negative sentiment
            print("Positive : ",predictedSentiment[0])
            print("Negative : ",predictedSentiment[1])
            if (predictedSentiment[0] > predictedSentiment[1]):
                result = "Positive"
            else:
                result = "Negative"

            sentiment_results["sentences"] = data
            sentiment_results["positiveScores"] = str(predictedSentiment[0])
            sentiment_results["negativeScores"] = str(predictedSentiment[1])
            sentiment_results["sentiment"] = result

            return sentiment_results
        except TypeError:
            raise

sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01")  
sc.setLogLevel("WARN")

ssc = StreamingContext(sc, 2)  
#kafkaStream = KafkaUtils.createStream(ssc, 'NLP:2181', 'spark-streaming', {'weblogs':1})
kafkaStream = KafkaUtils.createDirectStream(ssc, topics = ['weblogs'], kafkaParams = {"metadata.broker.list": "NLP:9092"})
# Here to parse the inbound messages isn't valid JSON
parsed = kafkaStream.map(lambda v: json.loads(v[1]))
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
#parsed.saveAsTextFiles("file:///D:/spark-kafka.txt")

id_twitter = parsed.map(lambda tweet: tweet["id"])
id_twitter.saveAsTextFiles("file:///D:/id-tweet.txt")
id_twitter.count().map(lambda x:'ID in this batch: %s' % x).pprint()

name = parsed.map(lambda tweet: tweet["name"])
name.saveAsTextFiles("file:///D:/name-tweet.txt")
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()

text = parsed.map(lambda tweet: tweet["text"])
text.saveAsTextFiles("file:///D:/text-tweet.txt")

sentiment = text.mapPartitions(sentimentPredict)
sentiment.saveAsTextFiles("file:///D:/sentiment-tweet.txt")

#sentiment_result = text.map(sentimentPredict(text))
#sentiment_result = text.flatMap(sentimentPredict(text))
#print(sentiment_result)

#parsed.map(lambda x:'Tweets in this batch: %s' % x).pprint()
#parsed.encode("utf-8").pprint()
#print(parsed)
#print(soup.encode("utf-8"))
#sentiment_result.saveAsTextFiles("file:///D:/spark-kafka.txt")
ssc.start()
ssc.awaitTermination()

But, when I ran my code using spark-submit2 in terminal, i got this error ...

Traceback (most recent call last):
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 235, in dump
    return Pickler.dump(self, obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 409, in dump
    self.save(obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 751, in save_tuple
    save(element)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 805, in _batch_appends
    save(x)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 808, in _batch_appends
    save(tmp[0])
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 372, in save_function
    self.save_function_tuple(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 525, in save_function_tuple
    save(f_globals)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
    save(v)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
    save(state)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
    save(v)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
    save(state)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
    save(v)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 496, in save
    rv = reduce(self.proto)
TypeError: can't pickle _thread.RLock objects
2018-04-09 16:21:48 ERROR JobScheduler:91 - Error generating jobs for time 1523265708000 ms
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 235, in dump
    return Pickler.dump(self, obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 409, in dump
    self.save(obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 751, in save_tuple
    save(element)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 805, in _batch_appends
    save(x)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 808, in _batch_appends
    save(tmp[0])
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 372, in save_function
    self.save_function_tuple(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 525, in save_function_tuple
    save(f_globals)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
    save(v)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
    save(state)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
    save(v)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
    save(state)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
    save(v)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 496, in save
    rv = reduce(self.proto)
TypeError: can't pickle _thread.RLock objects

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\streaming\util.py", line 67, in call
    return r._jrdd
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2470, in _jrdd
    self._jrdd_deserializer, profiler)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2403, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2389, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\serializers.py", line 568, in dumps
    return cloudpickle.dumps(obj, 2)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 918, in dumps
    cp.dump(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 249, in dump
    raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects

        at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
        at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
        at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Traceback (most recent call last):
  File "D:/PROJECT_MABESPOLRI/progress_spark_sentiment/spark+sentiment.py", line 171, in <module>
    ssc.awaitTermination()
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\streaming\context.py", line 206, in awaitTermination
    self._jssc.awaitTermination()
  File "C:\Users\CS\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Users\CS\Anaconda3\lib\site-packages\py4j\protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o22.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 235, in dump
    return Pickler.dump(self, obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 409, in dump
    self.save(obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 751, in save_tuple
    save(element)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 805, in _batch_appends
    save(x)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 781, in save_list
    self._batch_appends(obj)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 808, in _batch_appends
    save(tmp[0])
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 372, in save_function
    self.save_function_tuple(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 525, in save_function_tuple
    save(f_globals)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
    save(v)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
    save(state)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
    save(v)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 521, in save
    self.save_reduce(obj=obj, *rv)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 804, in save_reduce
    save(state)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 476, in save
    f(self, obj) # Call unbound method with explicit self
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 821, in save_dict
    self._batch_setitems(obj.items())
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 847, in _batch_setitems
    save(v)
  File "C:\Users\CS\Anaconda3\lib\pickle.py", line 496, in save
    rv = reduce(self.proto)
TypeError: can't pickle _thread.RLock objects

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\streaming\util.py", line 67, in call
    return r._jrdd
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2470, in _jrdd
    self._jrdd_deserializer, profiler)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2403, in _wrap_function
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\rdd.py", line 2389, in _prepare_for_python_RDD
    pickled_command = ser.dumps(command)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\serializers.py", line 568, in dumps
    return cloudpickle.dumps(obj, 2)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 918, in dumps
    cp.dump(obj)
  File "C:\Users\CS\Anaconda3\lib\site-packages\pyspark\cloudpickle.py", line 249, in dump
    raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects

        at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
        at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
        at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Can someone give me a clue how to solving this problem ??? Thank you

like image 406
Sofian Fadli Avatar asked Apr 09 '18 09:04

Sofian Fadli


1 Answers

Try to put spark code in a separate function with only necessary arguments. When you run spark operations it tries to pickle everything in the current scope (in your case on a top-level), if it encounters some object which can't be pickled it throws an error. In your case, I suspect the error is perhaps caused by variable "lock".

like image 60
Mikhail Berlinkov Avatar answered Oct 15 '22 11:10

Mikhail Berlinkov