I'm new to PySpark, Below is my JSON file format from kafka.
{
"header": {
"platform":"atm",
"version":"2.0"
}
"details":[
{
"abc":"3",
"def":"4"
},
{
"abc":"5",
"def":"6"
},
{
"abc":"7",
"def":"8"
}
]
}
how can I read through the values of all "abc"
"def"
in details and add this is to a new list like this [(1,2),(3,4),(5,6),(7,8)]
. The new list will be used to create a spark data frame. how can i do this in pyspark.I tried the below code.
parsed = messages.map(lambda (k,v): json.loads(v))
list = []
summed = parsed.map(lambda detail:list.append((String(['mcc']), String(['mid']), String(['dsrc']))))
output = summed.collect()
print output
It produces the error 'too many values to unpack'
Error message below at statement summed.collect()
16/09/12 12:46:10 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 16/09/12 12:46:10 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 16/09/12 12:46:10 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 16/09/12 12:46:10 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main process() File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/hdp/2.3.4.0-3485/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream vs = list(itertools.islice(iterator, batch)) File "", line 1, in ValueError: too many values to unpack
json is read using the spark. read. json("path") function. The "multiline_dataframe" value is created for reading records from JSON files that are scattered in multiple lines so, to read such files, use-value true to multiline option and by default multiline option is set to false.
Spark Read JSON File into DataFramejson("path") or spark. read. format("json"). load("path") you can read a JSON file into a Spark DataFrame, these methods take a file path as an argument.
Using read. json("path") or read. format("json"). load("path") you can read a JSON file into a PySpark DataFrame, these methods take a file path as an argument.
import pyspark
from pyspark import SparkConf
# You can configure the SparkContext
conf = SparkConf()
conf.set('spark.local.dir', '/remote/data/match/spark')
conf.set('spark.sql.shuffle.partitions', '2100')
SparkContext.setSystemProperty('spark.executor.memory', '10g')
SparkContext.setSystemProperty('spark.driver.memory', '10g')
sc = SparkContext(appName='mm_exp', conf=conf)
sqlContext = pyspark.SQLContext(sc)
data = sqlContext.read.json(file.json)
I feel that he missed an important part of the read sequence. You have to initialize a SparkContext.
When you start a SparkContext, it also spins up a webUI on port 4040. The webUI can be accessed using http://localhost:4040. That is a useful place to check progress of all calculations.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With