Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to change column metadata in pyspark?

How can I update column metadata in PySpark? I have metadata values corresponding to nominal encoding of categorical (string) features and I would like to decode them back in automated way. Writing the metadata in PySpark API is not directly available unless you recreate the schema. Is it possible to edit metadata in PySpark on the go without converting dataset to RDD and converting it back, provided complete schema description (as described here)?

Example listing:

# Create DF
df.show()

# +---+-------------+
# | id|     features|
# +---+-------------+
# |  0|[1.0,1.0,4.0]|
# |  1|[2.0,2.0,4.0]|
# +---+-------------+
# - That one has all the necessary metadata about what is encoded in feature column

# Slice one feature out
df = VectorSlicer(inputCol='features', outputCol='categoryIndex', indices=[1]).transform(df)
df = df.drop('features')
# +---+-------------+
# | id|categoryIndex|
# +---+-------------+
# |  0|        [1.0]|
# |  1|        [2.0]|
# +---+-------------+
# categoryIndex now carries metadata about singular array with encoding

# Get rid of the singular array
udf = UserDefinedFunction(lambda x: float(x[0]), returnType=DoubleType())
df2 = df.select(*[udf(column).alias(column) if column == 'categoryIndex' else column for column in df.columns])
# +---+-------------+
# | id|categoryIndex|
# +---+-------------+
# |  0|          1.0|
# |  1|          2.0|
# +---+-------------+
# - Metadata is lost for that one


# Write metadata
extract = {...}
df2.schema.fields[1].metadata = extract(df.schema.fields[1].metadata)
# metadata is readable from df2.schema.fields[1].metadata but is not affective. 
# Saving and restoring df from parque destroys the change
# Decode categorical
df = IndexToString(inputCol="categoryIndex", outputCol="category").transform(df)
# ERROR. Was supposed to decode the categorical values

Question provides an insight about how to work with VectorAssembler, VectorIndexer and how to add metadata by constructing a complete schema using StructType but yet does not answer my question.

like image 686
y.selivonchyk Avatar asked May 30 '17 22:05

y.selivonchyk


People also ask

What is metadata in PySpark?

Metadata is a wrapper over Map[String, Any] that limits the value type to simple ones: Boolean, Long, Double, String, Metadata, Array[Boolean], Array[Long], Array[Double], Array[String], and Array[Metadata]. JSON is used for serialization. The default constructor is private.

How do you change the DataType of a column in spark?

To change the Spark SQL DataFrame column type from one data type to another data type you should use cast() function of Column class, you can use this on withColumn(), select(), selectExpr(), and SQL expression.

How do I change a column value in spark?

You can replace column values of PySpark DataFrame by using SQL string functions regexp_replace(), translate(), and overlay() with Python examples.


1 Answers

In both cases losing metadata is expected:

  • When you call Python udf there is no relationship between input Column and its metadata, and output Column. UserDefinedFunction (both in Python and Scala) are black boxes for the Spark engine.
  • Assigning data directly to the Python schema object:

    df2.schema.fields[1].metadata = extract(df.schema.fields[1].metadata)
    

    is not a valid approach at all. Spark DataFrame is a thing wrapper around JVM object. Any changes in the Python wrappers, are completely opaque for JVM backend, and won't be propagated at all:

    import json 
    
    df = spark.createDataFrame([(1, "foo")], ("k", "v"))
    df.schema[-1].metadata = {"foo": "bar"}
    
    json.loads(df._jdf.schema().json())
    
    ## {'fields': [{'metadata': {}, 'name': 'k', 'nullable': True, 'type': 'long'},
    ##   {'metadata': {}, 'name': 'v', 'nullable': True, 'type': 'string'}],
    ## 'type': 'struct'}
    

    or even preserved in Python:

    df.select("*").schema[-1].metadata
    ## {}
    

With Spark < 2.2 you can use a small wrapper (taken from Spark Gotchas, maintained by me and @eliasah):

def withMeta(self, alias, meta):
    sc = SparkContext._active_spark_context
    jmeta = sc._gateway.jvm.org.apache.spark.sql.types.Metadata
    return Column(getattr(self._jc, "as")(alias, jmeta.fromJson(json.dumps(meta))))

df.withColumn("foo", withMeta(col("foo"), "", {...}))

With Spark >= 2.2 you can use Column.alias:

df.withColumn("foo", col("foo").alias("", metadata={...}))
like image 57
zero323 Avatar answered Sep 22 '22 16:09

zero323