Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Building a StructType from a dataframe in pyspark

I am new spark and python and facing this difficulty of building a schema from a metadata file that can be applied to my data file. Scenario: Metadata File for the Data file(csv format), contains the columns and their types: for example:

id,int,10,"","",id,"","",TRUE,"",0
created_at,timestamp,"","","",created_at,"","",FALSE,"",0

I have successfully converted this to a dataframe that looks like:

+--------------------+---------------+
|                name|           type|
+--------------------+---------------+
|                  id|  IntegerType()|
|          created_at|TimestampType()|
|          updated_at|   StringType()|

But when I try to convert this to a StructField format using this

fields = schemaLoansNew.map(lambda l:([StructField(l.name, l.type, 'true')]))

OR

schemaList = schemaLoansNew.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).collect()

And then later convert it to StructType, using

schemaFinal = StructType(schemaList)

I get the following error:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/mapr/spark/spark-1.4.1/python/pyspark/sql/types.py", line 372, in __init__
assert all(isinstance(f, DataType) for f in fields), "fields should be a list of DataType"
AssertionError: fields should be a list of DataType

I am stuck on this due to my lack of knowledge on Data Frames, can you please advise, how to proceed on this. once I have schema ready I want to use createDataFrame to apply to my data File. This process has to be done for many tables so I do not want to hardcode the types rather use the metadata file to build the schema and then apply to the RDD.

Thanks in advance.

like image 515
learning Avatar asked Mar 16 '16 03:03

learning


People also ask

What is StructType Pyspark?

StructType – Defines the structure of the DataframePySpark provides from pyspark. sql. types import StructType class to define the structure of the DataFrame. StructType is a collection or list of StructField objects. PySpark printSchema() method on the DataFrame shows StructType columns as struct .

How do I print a schema of a DataFrame in Pyspark?

DataFrame. printSchema() is used to print or display the schema of the DataFrame in the tree format along with column name and data type. If you have DataFrame with a nested structure it displays schema in a nested tree format.


2 Answers

Fields have argument have to be a list of DataType objects. This:

.map(lambda l:([StructField(l.name, l.type, 'true')]))

generates after collect a list of lists of tuples (Rows) of DataType (list[list[tuple[DataType]]]) not to mention that nullable argument should be boolean not a string.

Your second attempt:

.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).

generates after collect a list of str objects.

Correct schema for the record you've shown should look more or less like this:

from pyspark.sql.types import *

StructType([
    StructField("id", IntegerType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("updated_at", StringType(), True)
])

Although using distributed data structures for task like this is a serious overkill, not to mention inefficient, you can try to adjust your first solution as follows:

StructType([
    StructField(name, eval(type), True) for (name, type) in  df.rdd.collect()
])

but it is not particularly safe (eval). It could be easier to build a schema from JSON / dictionary. Assuming you have function which maps from type description to canonical type name:

def get_type_name(s: str) -> str:
    """
    >>> get_type_name("int")
    'integer'
    """
    _map = {
        'int': IntegerType().typeName(),
        'timestamp': TimestampType().typeName(),
        # ...
    } 
    return _map.get(s, StringType().typeName())

You can build dictionary of following shape:

schema_dict = {'fields': [
    {'metadata': {}, 'name': 'id', 'nullable': True, 'type': 'integer'},
    {'metadata': {}, 'name': 'created_at', 'nullable': True, 'type': 'timestamp'}
], 'type': 'struct'}

and feed it to StructType.fromJson:

StructType.fromJson(schema_dict)
like image 135
zero323 Avatar answered Oct 18 '22 03:10

zero323


Below steps can be followed to change the Datatype Objects

data_schema=[
    StructField("age", IntegerType(), True),
    StructField("name", StringType(), True)
]



final_struct=StructType(fields=data_schema)

df=spark.read.json('/home/abcde/Python-and-Spark-for-Big-Data-master/Spark_DataFrames/people.json', schema=final_struct)



df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)
like image 1
BigData-Guru Avatar answered Oct 18 '22 04:10

BigData-Guru