Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Converting JavaRDD to DataFrame in Spark java

I am trying to process the LogFile. first i read the log file and split these file as per my requirement and saved each column into separate JavaRDD. Now i need to convert these JavaRDD's to DataFrames for future operations. This is the code what i tried so far:

         SparkConf conf = new SparkConf().setAppName("AuctionBid").setMaster("local");
         JavaSparkContext sc = new JavaSparkContext(conf);
         JavaRDD<String> diskfile = sc.textFile("/Users/karuturi/Downloads/log.txt");
         JavaRDD<String> urlrdd=diskfile.flatMap(line -> Arrays.asList(line.split("\t")[0]));
         System.out.println(urlrdd.take(1));
         SQLContext sql = new SQLContext(sc);

and this is the way how i am trying to convert JavaRDD into DataFrame:

DataFrame fileDF = sqlContext.createDataFrame(urlRDD, Model.class);

But the above line is not working.I confusing about Model.class.

can anyone suggest me.

Thanks.

like image 902
ROOT Avatar asked Dec 23 '16 13:12

ROOT


People also ask

Can we convert Dataset to DataFrame in Spark?

Converting Spark RDD to DataFrame can be done using toDF(), createDataFrame() and transforming rdd[Row] to the data frame.

How do you convert existing RDDs to Spark datasets?

Convert Using createDataFrame Method The SparkSession object has a utility method for creating a DataFrame – createDataFrame. This method can take an RDD and create a DataFrame from it. The createDataFrame is an overloaded method, and we can call the method by passing the RDD alone or with a schema.

What is JavaRDD Spark?

(If you're new to Spark, JavaRDD is a distributed collection of objects, in this case lines of text in a file. We can apply operations to these objects that will automatically be parallelized across a cluster.)

What is difference between Dataset and DataFrame?

DataFrames are a SparkSQL data abstraction and are similar to relational database tables or Python Pandas DataFrames. A Dataset is also a SparkSQL structure and represents an extension of the DataFrame API. The Dataset API combines the performance optimization of DataFrames and the convenience of RDDs.


2 Answers

Imports:

import java.io.Serializable;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Create a POJO class for URL. I'd recommend you to write for Log line which consists of url, date, time, method, target,.. etc as members

public static class Url implements Serializable {
  private String value;

  public String getValue() {
    return value;
  }

  public void setValue(String value) {
    this.value = value;
  }
}  

Create an RDD of Url objects from a text file

JavaRDD<Url> urlsRDD = spark.read()
  .textFile("/Users/karuturi/Downloads/log.txt")
  .javaRDD()
  .map(new Function<String, Url>() {
    @Override
    public Url call(String line) throws Exception {
      String[] parts = line.split("\\t");
      Url url = new Url();
      url.setValue(parts[0].replaceAll("[", ""));
      return url;
    }
  });

Create DataFrame from RDD

Dataset<Row> urlsDF = spark.createDataFrame(urlsRDD, Url.class);

RDD to DataFrame - Spark 2.0
RDD to DataFrame - Spark 1.6

like image 152
mrsrinivas Avatar answered Oct 10 '22 23:10

mrsrinivas


You can do something like (I am converting on the fly from scala so excuse any typos):

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

JavaRDD<Row> rowRDD = urlrdd.map(new Function<String, Row>() {
    @Override
    public Row call(String record) throws Exception {
        return RowFactory.create(record());
    }
}
// now you wish to create the target schema. This is basically a list of
// fields (each field would be a column) which you are adding to a StructType
List<StructField> fields = new ArrayList<>();
StructField field = DataTypes.createStructField("url", DataTypes.StringType, true);
fields.add(field);
StructType schema = DataTypes.createStructType(fields);

// now you can create the dataframe:
DataFrame df= sqlContext.createDataFrame(rowRDD, schema);    

A couple additional notes:

  • Why are you flatmaping when you are only taking the first element? You could have simply done:

    JavaRDD<String> urlrdd=diskfile.flatMap(line -> line.split("\t")[0]);

  • I assume in real life you would want to remove the '[' from the url (you can easily do this in the map).

  • If you are moving to spark 2.0 or later then instead of sqlContext you should be using spark session (spark).

  • You can create a single dataframe with all columns. You can do this by adding all fields to the schema (i.e. instead of just doing a single add to the fields add all of them). Instead of using urlrdd, use diskfile and do the split inside the "public Row call" creation. This would be something like this:

    JavaRDD<Row> rowRDD = diskfile.map(new Function<String, Row>() { @override public Row call(String record) throws Exception { String[] recs = record.split("\t") return RowFactory.create(recs[0], recs[1], ...); } });

  • You can create it directly: Just use

    sqlContext.read.option("sep","\t").csv.load(filename,schema)

like image 22
Assaf Mendelson Avatar answered Oct 10 '22 23:10

Assaf Mendelson