I need to extract some data from a pipelinedRDD but while converting it to Dataframe it is giving the following error:
Traceback (most recent call last):
File "/home/karan/Desktop/meds.py", line 42, in <module>
relevantToSymEntered(newrdd)
File "/home/karan/Desktop/meds.py", line 26, in relevantToSymEntered
mat = spark.createDataFrame(self,StructType([StructField("Prescribed
medicine",StringType), StructField(["Disease","ID","Symptoms
Recorded","Severeness"],ArrayType)]))
File "/home/karan/Downloads/spark-2.4.2-bin-
hadoop2.7/python/pyspark/sql/types.py", line 409, in __init__
"dataType %s should be an instance of %s" % (dataType, DataType)
AssertionError: dataType <class 'pyspark.sql.types.StringType'> should be an
instance of <class 'pyspark.sql.types.DataType'>
1. Thing my error is of different type it is TypeError while I got problems with AssertionError.
I've already tried using toDF() but it changes the column names which is undesirable.
import findspark
findspark.init('/home/karan/Downloads/spark-2.4.2-bin-hadoop2.7')
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StringType, IntegerType, StructField, ArrayType
from pyspark import SparkConf, SparkContext
import pandas as pd
def reduceColoumns(self):
try:
filtered=self.rdd.map(lambda x: (x["Prescribed medicine"],list([x["Disease"],x["ID"],x["Symptoms Recorded"],x["Severeness"]])))
except Exception as e:
print("Error in CleanData:- ")
print(e)
return filtered
def cleanData(self,s):
try:
self.zipWithIndex
except Exception as e:
print("Error in CleanData:- ")
print(e)
return self.filter(lambda x: x[1][0]==s)
def relevantToSymEntered(self):
mat = spark.createDataFrame(self,StructType([StructField("Prescribed medicine",StringType), StructField(["Disease","ID","Symptoms Recorded","Severeness"],ArrayType)]))
#mat = mat.rdd.map(lambda x: (x["Prescribed medicine"],list([x["ID"],x["Symptoms Recorded"],x["Severeness"]])))
print(type(mat))
conf = SparkConf().setMaster("local[*]").setAppName("MovieSimilarities")
sc = SparkContext(conf = conf)
spark=SQLContext(sc)
rdd = spark.read.csv("/home/karan/Desktop/ExportExcel2.csv",header=True,sep=",",multiLine="True")
print(rdd)
newrdd=reduceColoumns(rdd)
x=input("Enter the disease-")
newrdd=cleanData(newrdd,x)
relevantToSymEntered(newrdd)
StringType [source] String data type. fromInternal (obj) Converts an internal SQL object into a native Python object.
In PySpark, you can cast or change the DataFrame column data type using cast() function of Column class, in this article, I will be using withColumn(), selectExpr() , and SQL expression to cast the from String to Int (Integer Type), String to Boolean e.t.c using PySpark examples.
You can find all column names & data types (DataType) of PySpark DataFrame by using df. dtypes and df. schema and you can also retrieve the data type of a specific column name using df. schema["name"].
StructType([StructField("Prescribed medicine",StringType), StructField(["Disease","ID","Symptoms Recorded","Severeness"],ArrayType)])
replace into:
StructType([StructField("Prescribed medicine",StringType()), StructField(["Disease","ID","Symptoms Recorded","Severeness"],ArrayType())])
.
You need to instantiate the class.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With