In Spark SQL when I tried to use map function on DataFrame then I am getting below error.
The method map(Function1, ClassTag) in the type DataFrame is not applicable for the arguments (new Function(){})
I am following spark 1.3 documentation as well. https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection Have any one solution?
Here is my testing code.
// SQL can be run over RDDs that have been registered as tables.
DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
List<String> teenagerNames = teenagers.map(
new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
Spark map() usage on DataFrameSpark provides 2 map transformations signatures on DataFrame one takes scala. function1 as an argument and the other takes Spark MapFunction . if you notice below signatures, both these functions returns Dataset[U] but not DataFrame (DataFrame=Dataset[Row]).
Spark Map function takes one element as input process it according to custom code (specified by the developer) and returns one element at a time. Map transforms an RDD of length N into another RDD of length N. The input and output RDDs will typically have the same number of records.
2.1 Using Spark DataTypes. We can create a map column using createMapType() function on the DataTypes class. This method takes two arguments keyType and valueType as mentioned above and these two arguments should be of a type that extends DataType.
Change this to:
Java 6 & 7
List<String> teenagerNames = teenagers.javaRDD().map(
new Function<Row, String>() {
public String call(Row row) {
return "Name: " + row.getString(0);
}
}).collect();
Java 8
List<String> t2 = teenagers.javaRDD().map(
row -> "Name: " + row.getString(0)
).collect();
Once you call javaRDD() it works just like any other RDD map function.
This works with Spark 1.3.0 and up.
No need to convert to RDD, its delays the execution it can be done as below
`public static void mapMethod() { // Read the data from file, where the file is in the classpath. Dataset df = sparkSession.read().json("file1.json");
// Prior to java 1.8
Encoder<String> encoder = Encoders.STRING();
List<String> rowsList = df.map((new MapFunction<Row, String>() {
private static final long serialVersionUID = 1L;
@Override
public String call(Row row) throws Exception {
return "string:>" + row.getString(0).toString() + "<";
}
}), encoder).collectAsList();
// from java 1.8 onwards
List<String> rowsList1 = df.map((row -> "string >" + row.getString(0) + "<" ), encoder).collectAsList();
System.out.println(">>> " + rowsList);
System.out.println(">>> " + rowsList1);
}`
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With