I'm trying to dynamically build a row in pySpark 1.6.1, then build it into a dataframe. The general idea is to extend the results of describe
to include, for example, skew and kurtosis. Here's what I thought should work:
from pyspark.sql import Row row_dict = {'C0': -1.1990072635132698, 'C3': 0.12605772684660232, 'C4': 0.5760856026559944, 'C5': 0.1951877800894315, 'C6': 24.72378589441825, 'summary': 'kurtosis'} new_row = Row(row_dict)
But this returns TypeError: sequence item 0: expected string, dict found
which is a fairly clear error. Then I found that if I defined the Row fields first, I could use a dict:
r = Row('summary', 'C0', 'C3', 'C4', 'C5', 'C6') r(row_dict) > Row(summary={'summary': 'kurtosis', 'C3': 0.12605772684660232, 'C0': -1.1990072635132698, 'C6': 24.72378589441825, 'C5': 0.1951877800894315, 'C4': 0.5760856026559944})
Which would be a fine step, except it doesn't seem like I can dynamically specify the fields in Row
. I need this to work for an unknown number of rows with unknown names. According to the documentation you can actually go the other way:
>>> Row(name="Alice", age=11).asDict() == {'name': 'Alice', 'age': 11} True
So it seems like I should be able to do this. It also appears there may be some deprecated features from older versions that allowed this, for example here. Is there a more current equivalent I'm missing?
If we want to create several Rows, then we have to specify the Row class inside a list separated by a comma operator. Syntax: [Row(column_name='value',……….), Row(column_name='value',……….) To create Pyspark DataFrame from this Row, we simply pass the Row list to the createDataFrame() method.
To create a new Row, use RowFactory. create() in Java or Row. apply() in Scala. A Row object can be constructed by providing field values.
As I said in the beginning, PySpark doesn't have a Dictionary type instead it uses MapType to store the dictionary object, below is an example of how to create a DataFrame column MapType using pyspark. sql. types. StructType .
You can use keyword arguments unpacking as follows:
Row(**row_dict) ## Row(C0=-1.1990072635132698, C3=0.12605772684660232, C4=0.5760856026559944, ## C5=0.1951877800894315, C6=24.72378589441825, summary='kurtosis')
It is important to note that it internally sorts data by key to address problems with older Python versions.
This behavior is likely to be removed in the upcoming releases - see SPARK-29748 Remove sorting of fields in PySpark SQL Row creation. Once it is remove you'll have to ensure that the order of values in the dict
is consistent across records.
In case the dict is not flatten, you can convert dict to Row recursively.
def as_row(obj): if isinstance(obj, dict): dictionary = {k: as_row(v) for k, v in obj.items()} return Row(**dictionary) elif isinstance(obj, list): return [as_row(v) for v in obj] else: return obj
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With