I use Apache spark as an ETL tool to fetch tables from Oracle into Elasticsearch.
I face an issue with numeric columns that spark recognize them as decimal
whereas Elasticsearch doesn't accept decimal
type; so i convert each decimal
columns into double
which is accepted for Elasticsearch.
dataFrame = dataFrame.select(
[col(name) if 'decimal' not in colType else col(name).cast('double') for name, colType in dataFrame.dtypes]
)
The current issue that every numeric column will be double; either it has decimal value or not.
My question is there any way to detect column type should be converted into either integer type or double?
In PySpark, you can cast or change the DataFrame column data type using cast() function of Column class, in this article, I will be using withColumn(), selectExpr() , and SQL expression to cast the from String to Int (Integer Type), String to Boolean e.t.c using PySpark examples.
You can use Decimal. TryParse to check if the value can be converted to a Decimal type. You could also use Double. TryParse instead if you assign the result to a variable of type Double.
Use the CAST() function to convert an integer to a DECIMAL data type. This function takes an expression or a column name as the argument, followed by the keyword AS and the new data type. In our example, we converted an integer (12) to a decimal value (12.00).
In PySpark SQL, using the cast() function you can convert the DataFrame column from String Type to Double Type or Float Type. This function takes the argument string representing the type you wanted to convert or any type that is a subclass of DataType.
You can retrieve all column names with datatype == DecimalType() from the schema of the dataframe, see below for an example (tested on Spark 2.4.0):
Update: just use df.dtypes which is enough to retrieve the information.
from pyspark.sql.functions import col
df = spark.createDataFrame([ (1, 12.3, 1.5, 'test', 13.23) ], ['i1', 'd2', 'f3', 's4', 'd5'])
df = df.withColumn('d2', col('d2').astype('decimal(10,1)')) \
.withColumn('d5', col('d5').astype('decimal(10,2)'))
#DataFrame[i1: bigint, d2: decimal(10,1), f3: double, s4: string, d5: decimal(10,2)]
decimal_cols = [ f[0] for f in df.dtypes if f[1].startswith('decimal') ]
print(decimal_cols)
['d2', 'd5']
Just a follow-up: the above method will not work for array, struct and nested data structures. If the field names in struct don't contain characters like spaces, dot etc, you can use the type from the df.dtypes directly.
import re
from pyspark.sql.functions import array, struct, col
decimal_to_double = lambda x: re.sub(r'decimal\(\d+,\d+\)', 'double', x)
df1 = df.withColumn('a6', array('d2','d5')).withColumn('s7', struct('i1','d2'))
# DataFrame[i1: bigint, d2: decimal(10,1), l3: double, s4: string, d5: decimal(10,2), a6: array<decimal(11,2)>, s7: struct<i1:bigint,d2:decimal(10,1)>]
df1.select(*[ col(d[0]).astype(decimal_to_double(d[1])) if 'decimal' in d[1] else col(d[0]) for d in df1.dtypes ])
# DataFrame[i1: bigint, d2: double, l3: double, s4: string, d5: double, a6: array<double>, s7: struct<i1:bigint,d2:double>]
However, if any field-names of StructType()
contain spaces, dot etc. the above method might not be working. In such case, I suggest you check: df.schema.jsonValue()['fields']
to retrieve and manipulate JSON data to do the dtype transformation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With