Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark textFile json with indentation

Wanne read a json file with indentation into RDD, but spark throws an exception then.

# txts = sc.textFile('data/jsons_without_indentation') # works
txts = sc.textFile('data/jsons_with_indentation')      # fails
txts_dicts = txts.map(lambda data: json.loads(data))
txts_dicts.collect()

sc.wholeTextFiles does not work either. Is it possible to load a json with indentation without transforming it into a file without first?

Example json file looks like this:

{
    "data": {
        "text": {
            "de": "Ein Text.",
            "en": "A text."
        }
    }
}
like image 641
rebeling Avatar asked Mar 15 '23 06:03

rebeling


1 Answers

If this is just a single JSON document per file all you need is SparkContext.wholeTextFiles. First lets create some dummy data:

import tempfile
import json 

input_dir = tempfile.mkdtemp()

docs = [
    {'data': {'text': {'de': 'Ein Text.', 'en': 'A text.'}}},
    {'data': {'text': {'de': 'Ein Bahnhof.', 'en': 'A railway station.'}}},
    {'data': {'text': {'de': 'Ein Hund.', 'en': 'A dog.'}}}]

for doc in docs:
    with open(tempfile.mktemp(suffix="json", dir=input_dir), "w") as fw:
        json.dump(doc, fw, indent=4)

Now lets read data:

rdd = sc.wholeTextFiles(input_dir).values()

and make sure that files are indeed indented:

print rdd.top(1)[0]

## {
##     "data": {
##         "text": {
##             "de": "Ein Text.", 
##             "en": "A text."
##         }
##     }
## }

Finally we can parse:

parsed = rdd.map(json.loads)

and check if everything worked as expected:

parsed.takeOrdered(3)

## [{u'data': {u'text': {u'de': u'Ein Bahnhof.', u'en': u'A railway station.'}}},
##  {u'data': {u'text': {u'de': u'Ein Hund.', u'en': u'A dog.'}}},
##  {u'data': {u'text': {u'de': u'Ein Text.', u'en': u'A text.'}}}]

If you still experience some problems it is most likely due to some malformed entries. The simplest thing you can do is to discard malformed entries using flatMap with custom wrapper:

rdd_malformed = sc.parallelize(["{u'data': {u'text': {u'de':"]).union(rdd)

## org.apache.spark.api.python.PythonException: Traceback (most recent call ...
##     ...
## ValueError: Expecting property name: line 1 column 2 (char 1)

and wrapped using try_seq (defined here: What is the equivalent to scala.util.Try in pyspark?)

rdd_malformed.flatMap(lambda x: seq_try(json.loads, x)).collect()

## [{u'data': {u'text': {u'de': u'Ein Hund.', u'en': u'A dog.'}}},
##  {u'data': {u'text': {u'de': u'Ein Text.', u'en': u'A text.'}}},
##  {u'data': {u'text': {u'de': u'Ein Bahnhof.', u'en': u'A railway station.'}}}]
like image 186
zero323 Avatar answered Mar 19 '23 06:03

zero323