Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark DataFrame created from JavaRDD<Row> copies all columns data into first column

I have a DataFrame which I need to convert into JavaRDD<Row> and back to DataFrame I have the following code

DataFrame sourceFrame = hiveContext.read().format("orc").load("/path/to/orc/file");
//I do order by in above sourceFrame and then I convert it into JavaRDD
JavaRDD<Row> modifiedRDD = sourceFrame.toJavaRDD().map(new Function<Row,Row>({
    public Row call(Row row) throws Exception {
       if(row != null) {
           //updated row by creating new Row
           return RowFactory.create(updateRow);
       }
      return null;
});
//now I convert above JavaRDD<Row> into DataFrame using the following
DataFrame modifiedFrame = sqlContext.createDataFrame(modifiedRDD,schema);

sourceFrame and modifiedFrame schema is same when I call sourceFrame.show() output is expected I see every column has corresponding values and no column is empty but when I call modifiedFrame.show() I see all the columns values gets merged into first column value for e.g. assume source DataFrame has 3 column as shown below

_col1    _col2    _col3
 ABC       10      DEF
 GHI       20      JKL

When I print modifiedFrame which I converted from JavaRDD it shows in the following order

_col1        _col2      _col3
ABC,10,DEF
GHI,20,JKL

As shown above all the _col1 has all the values and _col2 and _col3 is empty. I don't know what is wrong.

like image 752
Umesh K Avatar asked Jul 22 '15 19:07

Umesh K


Video Answer


1 Answers

As I mentioned in question's comment ;

It might occurs because of giving list as a one parameter.

return RowFactory.create(updateRow);

When investigated Apache Spark docs and source codes ; In that specifying schema example They assign parameters one by one for all columns respectively. Just investigate the some source code roughly RowFactory.java class and GenericRow class doesn't allocate that one parameter. So Try to give parameters respectively for row's column's.

return RowFactory.create(updateRow.get(0),updateRow.get(1),updateRow.get(2)); // List Example

You may try to convert your list to array and then pass as a parameter.

YourObject[] updatedRowArray= new YourObject[updateRow.size()];
updateRow.toArray(updatedRowArray);
return RowFactory.create(updatedRowArray);

By the way RowFactory.create() method is creating Row objects. In Apache Spark documentation about Row object and RowFactory.create() method;

Represents one row of output from a relational operator. Allows both generic access by ordinal, which will incur boxing overhead for primitives, as well as native primitive access. It is invalid to use the native primitive interface to retrieve a value that is null, instead a user must check isNullAt before attempting to retrieve a value that might be null.

To create a new Row, use RowFactory.create() in Java or Row.apply() in Scala.

A Row object can be constructed by providing field values. Example:

import org.apache.spark.sql._

// Create a Row from values.

Row(value1, value2, value3, ...)

// Create a Row from a Seq of values.

Row.fromSeq(Seq(value1, value2, ...))

According to documentation; You can also apply your own required algorithm to seperate rows columns while creating Row objects respectively. But i think converting list to array and pass parameter as an array will work for you(I couldn't try please post your feedbacks, thanks).

like image 91
İlker Korkut Avatar answered Oct 18 '22 20:10

İlker Korkut