Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Access Dataframe's Row inside Row (nested JSON) with Pyspark

Using pyspark, I am reading multiple files containing one JSON-object each from a folder contentdata2,

df = spark.read\
.option("mode", "DROPMALFORMED")\
.json("./data/contentdata2/")

df.printSchema()
content = df.select('fields').collect()

where df.printSchema() yields

root
|-- fields: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- field: string (nullable = true)
|    |    |-- type: string (nullable = true)
|    |    |-- value: string (nullable = true)
|-- id: string (nullable = true)
|-- score: double (nullable = true)
|-- siteId: string (nullable = true)

I wish to access fields.element.field, and store each field which equals body, and the field which equals urlhash (for each JSON object).

The format of content is a Row (fields), containing other Rows, like this:

[Row(fields=[Row(field=‘body’, type=None, value=’[“First line of text“,”Second line of text”]), Row(field='urlhash', type=None, value='0a0b774c21c68325aa02cae517821e78687b2780')]),  Row(fields=[Row(field=‘body’, type=None, value=’[“First line of text“,”Second line of text”]), Row(field='urlhash', type=None, value='0a0b774c21c6caca977e7821e78687b2780')]), ...

The reason for the reappearing "[Row(fields=[Row(field=....) is because the JSON objects from the different files are being merged together in one list. There are also a lot of other Row elements as well which I am not interested in, and therefore did not include in the example.

The structure of the JSON objects looks like this:

{
  "fields": [
    {
      "field": "body",
      "value": [
        "Some text",
        "Another line of text",
        "Third line of text."
      ]
    },
    {
      "field": "urlhash",
      "value": "0a0a341e189cf2c002cb83b2dc529fbc454f97cc"
    }
  ],
  "score": 0.87475455,
  "siteId": "9222270286501375973",
  "id": "0a0a341e189cf2c002cb83b2dc529fbc454f97cc"
}

I wish to store all words from the body of each url, to later remove stopwords and feed it into a K nearest neighbour algorithm.

How do I approach the problem of storing the words from the body for each url, preferably as a tsv or csv with columns urlhash and words (which is a list of words from body)?

like image 981
sandrask Avatar asked Mar 21 '18 21:03

sandrask


1 Answers

You can approach this in two ways:

  • you can explode the array to get one record per line and then flatten the nested data frame
  • or access the sub-fields directly (for Spark > 2.X)

Let's start with your sample data frame:

from pyspark.sql import Row
from pyspark.sql.types import *
schema = StructType([
    StructField('fields', ArrayType(StructType([
        StructField('field', StringType()), 
        StructField('type', StringType()), 
        StructField('value', StringType())])))])

content = spark.createDataFrame(
    sc.parallelize([
        Row(
            fields=[
                Row(
                    field='body', 
                    type=None, 
                    value='["First line of text","Second line of text"]'), 
                Row(
                    field='urlhash', 
                    type=None, 
                    value='0a0b774c21c68325aa02cae517821e78687b2780')]), 
        Row(
            fields=[
                Row(
                    field='body', 
                    type=None, 
                    value='["First line of text","Second line of text"]'), 
                Row(
                    field='urlhash', 
                    type=None, 
                    value='0a0b774c21c6caca977e7821e78687b2780')])]), schema=schema)
content.printSchema()

    root
     |-- fields: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- field: string (nullable = true)
     |    |    |-- type: string (nullable = true)
     |    |    |-- value: string (nullable = true)

1. Explode and Flatten

Nested data frames' fields can be access using ., * allows you to flatten all of the nested fields and bring them to the rootlevel.

import pyspark.sql.functions as psf
content \
    .select(psf.explode('fields').alias('tmp')) \
    .select('tmp.*') \
    .show()

    +-------+----+--------------------+
    |  field|type|               value|
    +-------+----+--------------------+
    |   body|null|["First line of t...|
    |urlhash|null|0a0b774c21c68325a...|
    |   body|null|["First line of t...|
    |urlhash|null|0a0b774c21c6caca9...|
    +-------+----+--------------------+

    root
     |-- field: string (nullable = true)
     |-- type: string (nullable = true)
     |-- value: string (nullable = true)

2. access sub-fields directly

In later version of Spark you can access fields of nested StructTypes even when they are contained in an ArrayType. You'll end up with an ArrayType of the sub-field's values.

content \
    .select('fields.field') \
    .show()

    +---------------+
    |          field|
    +---------------+
    |[body, urlhash]|
    |[body, urlhash]|
    +---------------+

    root
     |-- field: array (nullable = true)
     |    |-- element: string (containsNull = true)
like image 111
MaFF Avatar answered Nov 10 '22 23:11

MaFF