Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

NullPointerException in spark-sql

I am writing a program to join two files on a common parameter using spark-sql. I think my code is fine but when I am trying to save it as a text file then I am getting errors. I am putting my code as below:-

import java.util.regex.Pattern;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;



import java.io.Serializable;


public class JoinCSV {
    @SuppressWarnings("serial")
    public static class CompleteSample implements Serializable {
        private String ASSETNUM;
        private String ASSETTAG;
        private String CALNUM;



        public String getASSETNUM() {
            return ASSETNUM;
        }
        public void setASSETNUM(String aSSETNUM) {
            ASSETNUM = aSSETNUM;
        }
        public String getASSETTAG() {
            return ASSETTAG;
        }
        public void setASSETTAG(String aSSETTAG) {
            ASSETTAG = aSSETTAG;
        }
        public String getCALNUM() {
            return CALNUM;
        }
        public void setCALNUM(String cALNUM) {
            CALNUM = cALNUM;
        }


      }

    @SuppressWarnings("serial")
    public static class ExtendedSample implements Serializable {

        private String ASSETNUM;
        private String CHANGEBY;
        private String CHANGEDATE;


        public String getASSETNUM() {
            return ASSETNUM;
        }
        public void setASSETNUM(String aSSETNUM) {
            ASSETNUM = aSSETNUM;
        }
        public String getCHANGEBY() {
            return CHANGEBY;
        }
        public void setCHANGEBY(String cHANGEBY) {
            CHANGEBY = cHANGEBY;
        }
        public String getCHANGEDATE() {
            return CHANGEDATE;
        }
        public void setCHANGEDATE(String cHANGEDATE) {
            CHANGEDATE = cHANGEDATE;
        }
    }

    private static final Pattern comma = Pattern.compile(",");
    @SuppressWarnings("serial")
    public static void main(String[] args) throws Exception {
        String path="C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv";
        String path1="C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv";

          JavaSparkContext ctx = new JavaSparkContext("local[2]", "JavaSparkSQL");
          JavaSQLContext sqlCtx = new JavaSQLContext(ctx);

          JavaRDD<CompleteSample> cs = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv").map(
                  new Function<String, CompleteSample>() {
                    public CompleteSample call(String line) throws Exception {
                      String[] parts = line.split(",");

                      CompleteSample cs = new CompleteSample();
                      cs.setASSETNUM(parts[0]);
                      cs.setASSETTAG(parts[1]);
                      cs.setCALNUM(parts[2]);

                      return cs;
                    }
                  });

          JavaRDD<ExtendedSample> es = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv").map(
                  new Function<String, ExtendedSample>() {
                    public ExtendedSample call(String line) throws Exception {
                      String[] parts = line.split(",");

                      ExtendedSample es = new ExtendedSample();
                      es.setASSETNUM(parts[0]);
                      es.setCHANGEBY(parts[1]);
                      es.setCHANGEDATE(parts[2]);

                      return es;
                    }
                  });

          JavaSchemaRDD complete = sqlCtx.applySchema(cs, CompleteSample.class);
            complete.registerAsTable("cs");

          JavaSchemaRDD extended = sqlCtx.applySchema(es, ExtendedSample.class);
          extended.registerAsTable("es");

          JavaSchemaRDD fs= sqlCtx.sql("SELECT ASSETTAG, CALNUM FROM cs INNER JOIN es ON cs.ASSETNUM=es.ASSETNUM;");
          fs.saveAsTextFile("result");                   //Here I am getting error
    }

}

and my errors are as below:-

    14/07/19 00:40:13 INFO TaskSchedulerImpl: Cancelling stage 0
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:0 failed 1 times, most recent failure: Exception failure in TID 4 on host localhost: java.lang.NullPointerException
            java.lang.ProcessBuilder.start(Unknown Source)
            org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
            org.apache.hadoop.util.Shell.run(Shell.java:379)
            org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
            org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
------------
------------

and

 14/07/19 00:40:11 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
    java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
        at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:278)
        at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:300)
        at org.apache.hadoop.util.Shell.<clinit>(Shell.java:293)
        at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
        at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:362)
        at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546)
        at org.apache.spark.SparkContext$$anonfun$22.apply(SparkContext.scala:546)
-----------------
-----------------

This second error is coming everywhere whether I am using spark, spark-sql or spark-streaming. I have no clue what this error is. But it seems that this second error has no impact on the code because even after this error the results use to come out fine. But still its very irritating to see an unknown error everytime you run a program.

Can anybody please help me out in understanding the issue please? I am stuck with this very badly. Thanks

like image 558
Amitabh Ranjan Avatar asked Jul 18 '14 19:07

Amitabh Ranjan


1 Answers

There is a work around for the rdd.saveAsTextFile() error on Windows. It fixes both the SparkException and IOException errors that I was also experiencing with Spark v1.1.0 on Windows 8.1 in local mode.

http://qnalist.com/questions/4994960/run-spark-unit-test-on-windows-7

Here are the steps from that link:

1) download compiled winutils.exe;

2) put this somewhere like c:\winutil\bin;

3) add this line to your code: System.setProperty("hadoop.home.dir", "c:\\winutil\\")

Hope this works for you.

like image 136
Dylan Hogg Avatar answered Oct 05 '22 04:10

Dylan Hogg