I am trying to process the LogFile. first i read the log file and split these file as per my requirement and saved each column into separate JavaRDD. Now i need to convert these JavaRDD's to DataFrames for future operations. This is the code what i tried so far:
SparkConf conf = new SparkConf().setAppName("AuctionBid").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> diskfile = sc.textFile("/Users/karuturi/Downloads/log.txt");
JavaRDD<String> urlrdd=diskfile.flatMap(line -> Arrays.asList(line.split("\t")[0]));
System.out.println(urlrdd.take(1));
SQLContext sql = new SQLContext(sc);
and this is the way how i am trying to convert JavaRDD into DataFrame:
DataFrame fileDF = sqlContext.createDataFrame(urlRDD, Model.class);
But the above line is not working.I confusing about Model.class.
can anyone suggest me.
Thanks.
Converting Spark RDD to DataFrame can be done using toDF(), createDataFrame() and transforming rdd[Row] to the data frame.
Convert Using createDataFrame Method The SparkSession object has a utility method for creating a DataFrame – createDataFrame. This method can take an RDD and create a DataFrame from it. The createDataFrame is an overloaded method, and we can call the method by passing the RDD alone or with a schema.
(If you're new to Spark, JavaRDD is a distributed collection of objects, in this case lines of text in a file. We can apply operations to these objects that will automatically be parallelized across a cluster.)
DataFrames are a SparkSQL data abstraction and are similar to relational database tables or Python Pandas DataFrames. A Dataset is also a SparkSQL structure and represents an extension of the DataFrame API. The Dataset API combines the performance optimization of DataFrames and the convenience of RDDs.
Imports:
import java.io.Serializable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Create a POJO class for URL. I'd recommend you to write for Log line which consists of url, date, time, method, target,.. etc as members
public static class Url implements Serializable {
private String value;
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
Create an RDD of Url objects from a text file
JavaRDD<Url> urlsRDD = spark.read()
.textFile("/Users/karuturi/Downloads/log.txt")
.javaRDD()
.map(new Function<String, Url>() {
@Override
public Url call(String line) throws Exception {
String[] parts = line.split("\\t");
Url url = new Url();
url.setValue(parts[0].replaceAll("[", ""));
return url;
}
});
Create DataFrame from RDD
Dataset<Row> urlsDF = spark.createDataFrame(urlsRDD, Url.class);
RDD to DataFrame - Spark 2.0
RDD to DataFrame - Spark 1.6
You can do something like (I am converting on the fly from scala so excuse any typos):
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
JavaRDD<Row> rowRDD = urlrdd.map(new Function<String, Row>() {
@Override
public Row call(String record) throws Exception {
return RowFactory.create(record());
}
}
// now you wish to create the target schema. This is basically a list of
// fields (each field would be a column) which you are adding to a StructType
List<StructField> fields = new ArrayList<>();
StructField field = DataTypes.createStructField("url", DataTypes.StringType, true);
fields.add(field);
StructType schema = DataTypes.createStructType(fields);
// now you can create the dataframe:
DataFrame df= sqlContext.createDataFrame(rowRDD, schema);
A couple additional notes:
Why are you flatmaping when you are only taking the first element? You could have simply done:
JavaRDD<String> urlrdd=diskfile.flatMap(line -> line.split("\t")[0]);
I assume in real life you would want to remove the '[' from the url (you can easily do this in the map).
If you are moving to spark 2.0 or later then instead of sqlContext you should be using spark session (spark).
You can create a single dataframe with all columns. You can do this by adding all fields to the schema (i.e. instead of just doing a single add to the fields add all of them). Instead of using urlrdd, use diskfile and do the split inside the "public Row call" creation. This would be something like this:
JavaRDD<Row> rowRDD = diskfile.map(new Function<String, Row>() {
@override public Row call(String record) throws Exception {
String[] recs = record.split("\t")
return RowFactory.create(recs[0], recs[1], ...);
}
});
You can create it directly: Just use
sqlContext.read.option("sep","\t").csv.load(filename,schema)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With