Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to filter a dataset according to datetime values in Spark

I am trying to filter my data according to it's datetime field. A sample from my data:

303,0.00001747,4351040,75.9054,"2019-03-08 19:29:18"

This is how I initialize spark:

    SparkConf conf = new SparkConf().setAppName("app name").setMaster("spark://192.168.1.124:7077");
    JavaSparkContext sc = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate(conf));

Firstly, I read the data above into my custom object like below:

    // Read data from file into custom object
    JavaRDD<CurrencyPair> rdd = sc.textFile(System.getProperty("user.dir") + "/data/data.csv", 2).map(
        new Function<String, CurrencyPair>() {
            public CurrencyPair call(String line) throws Exception {
                String[] fields = line.split(","); // Split line from commas

                // read each data into custom object
                CurrencyPair cp = new CurrencyPair();
                cp.setId(Integer.parseInt(fields[0].trim()));
                cp.setValue(Double.parseDouble(fields[1].trim()));
                cp.setBaseVolume(Double.parseDouble(fields[2].trim()));
                cp.setQuoteVolume(Double.parseDouble(fields[3].trim()));
                cp.setTimeStamp(new Date(fields[4].trim()));

                System.out.println("Date:" + fields[4].trim()); // To see if it will print or not

                return cp;
            }
        }
     );

In order to get the data which has timestamp bigger than a certain time, I wrote this filter:

    Calendar calendar = Calendar.getInstance();
    calendar.add(Calendar.DAY_OF_MONTH, -10); // This is for test issue

    // My filter to get the data for a certain time range
    Function<CurrencyPair, Boolean> filter = new Function<CurrencyPair, Boolean>() {
        @Override
        public Boolean call(CurrencyPair currencyPair) throws Exception {
            if(calendar.getTime().compareTo(currencyPair.getTimeStamp()) > 0){
                return false;
            }else{
                return true;
            }
        }
    };

This is how my custom object looks like:

public class CurrencyPair implements java.io.Serializable {

    private int id;
    private double value;
    private double baseVolume;
    private double quoteVolume;
    private Date timeStamp;

    // all getters and setters are here, but no constructor
}

To check the results of my filter, I tried to see some of them(first 100 here):

Iterator<CurrencyPair> result = rdd.repartition(100).filter(filter).toLocalIterator();
int counter = 0;
while (counter < 100 && result.hasNext()){
    System.out.println("Here: " + result.next());
    counter++;
}

But the problem is that when I run my code I got the following exception at the line where I write the first 100 result (Here: System.out.println("Here: " + result.next());)

Error:

19/05/12 00:05:47 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 6) on 192.168.1.124, executor 0: java.lang.ClassCastException (cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD

In my filter, I also write the datetime string to console with using System.out.println, but I can not see the result of it in the console as well. What am I doing wrong? How can I achieve this?


Edit: I noticed that I actually downloaded spark 2.3.0 version but in my maven file I was using 2.4.2. So I changed my maven file to 2.3.0 version.

This time I am getting the following error:

19/05/14 00:35:35 INFO BlockManager: BlockManager stopped
19/05/14 00:35:35 INFO BlockManagerMaster: BlockManagerMaster stopped
19/05/14 00:35:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/05/14 00:35:36 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
at scala.Predef$.require(Predef.scala:281)
at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:516)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
at spark.BasicSpark.readDataFile(BasicSpark.java:107)
at spark.BasicSpark.getWholeData(BasicSpark.java:39)
at controller.TableScreenController$2.handle(TableScreenController.java:66)
at controller.TableScreenController$2.handle(TableScreenController.java:62)
at com.sun.javafx.event.CompositeEventHandler.dispatchBubblingEvent(CompositeEventHandler.java:86)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:238)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:191)
at com.sun.javafx.event.CompositeEventDispatcher.dispatchBubblingEvent(CompositeEventDispatcher.java:59)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:58)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.EventUtil.fireEventImpl(EventUtil.java:74)
at com.sun.javafx.event.EventUtil.fireEvent(EventUtil.java:49)
at javafx.event.Event.fireEvent(Event.java:198)
at javafx.scene.Node.fireEvent(Node.java:8411)
at javafx.scene.control.Button.fire(Button.java:185)
at com.sun.javafx.scene.control.behavior.ButtonBehavior.mouseReleased(ButtonBehavior.java:182)
at com.sun.javafx.scene.control.skin.BehaviorSkinBase$1.handle(BehaviorSkinBase.java:96)
at com.sun.javafx.scene.control.skin.BehaviorSkinBase$1.handle(BehaviorSkinBase.java:89)
at com.sun.javafx.event.CompositeEventHandler$NormalEventHandlerRecord.handleBubblingEvent(CompositeEventHandler.java:218)
at com.sun.javafx.event.CompositeEventHandler.dispatchBubblingEvent(CompositeEventHandler.java:80)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:238)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:191)
at com.sun.javafx.event.CompositeEventDispatcher.dispatchBubblingEvent(CompositeEventDispatcher.java:59)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:58)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.EventUtil.fireEventImpl(EventUtil.java:74)
at com.sun.javafx.event.EventUtil.fireEvent(EventUtil.java:54)
at javafx.event.Event.fireEvent(Event.java:198)
at javafx.scene.Scene$MouseHandler.process(Scene.java:3757)
at javafx.scene.Scene$MouseHandler.access$1500(Scene.java:3485)
at javafx.scene.Scene.impl_processMouseEvent(Scene.java:1762)
at javafx.scene.Scene$ScenePeerListener.mouseEvent(Scene.java:2494)
at com.sun.javafx.tk.quantum.GlassViewEventHandler$MouseEventNotification.run(GlassViewEventHandler.java:394)
at com.sun.javafx.tk.quantum.GlassViewEventHandler$MouseEventNotification.run(GlassViewEventHandler.java:295)
at java.security.AccessController.doPrivileged(Native Method)
at com.sun.javafx.tk.quantum.GlassViewEventHandler.lambda$handleMouseEvent$350(GlassViewEventHandler.java:432)
at com.sun.javafx.tk.quantum.QuantumToolkit.runWithoutRenderLock(QuantumToolkit.java:389)
at com.sun.javafx.tk.quantum.GlassViewEventHandler.handleMouseEvent(GlassViewEventHandler.java:431)
at com.sun.glass.ui.View.handleMouseEvent(View.java:555)
at com.sun.glass.ui.View.notifyMouse(View.java:937)
at com.sun.glass.ui.gtk.GtkApplication._runLoop(Native Method)
at com.sun.glass.ui.gtk.GtkApplication.lambda$null$208(GtkApplication.java:245)
at java.lang.Thread.run(Thread.java:748)
19/05/14 00:35:36 INFO SparkContext: SparkContext already stopped.
19/05/14 00:35:36 INFO SparkContext: Successfully stopped SparkContext
java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
at scala.Predef$.require(Predef.scala:281)
at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:516)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
at spark.BasicSpark.readDataFile(BasicSpark.java:107)
at spark.BasicSpark.getWholeData(BasicSpark.java:39)
at controller.TableScreenController$2.handle(TableScreenController.java:66)
at controller.TableScreenController$2.handle(TableScreenController.java:62)
at com.sun.javafx.event.CompositeEventHandler.dispatchBubblingEvent(CompositeEventHandler.java:86)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:238)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:191)
at com.sun.javafx.event.CompositeEventDispatcher.dispatchBubblingEvent(CompositeEventDispatcher.java:59)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:58)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.EventUtil.fireEventImpl(EventUtil.java:74)
at com.sun.javafx.event.EventUtil.fireEvent(EventUtil.java:49)
at javafx.event.Event.fireEvent(Event.java:198)
at javafx.scene.Node.fireEvent(Node.java:8411)
at javafx.scene.control.Button.fire(Button.java:185)
at com.sun.javafx.scene.control.behavior.ButtonBehavior.mouseReleased(ButtonBehavior.java:182)
at com.sun.javafx.scene.control.skin.BehaviorSkinBase$1.handle(BehaviorSkinBase.java:96)
at com.sun.javafx.scene.control.skin.BehaviorSkinBase$1.handle(BehaviorSkinBase.java:89)
at com.sun.javafx.event.CompositeEventHandler$NormalEventHandlerRecord.handleBubblingEvent(CompositeEventHandler.java:218)
at com.sun.javafx.event.CompositeEventHandler.dispatchBubblingEvent(CompositeEventHandler.java:80)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:238)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:191)
at com.sun.javafx.event.CompositeEventDispatcher.dispatchBubblingEvent(CompositeEventDispatcher.java:59)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:58)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.EventUtil.fireEventImpl(EventUtil.java:74)
at com.sun.javafx.event.EventUtil.fireEvent(EventUtil.java:54)
at javafx.event.Event.fireEvent(Event.java:198)
at javafx.scene.Scene$MouseHandler.process(Scene.java:3757)
at javafx.scene.Scene$MouseHandler.access$1500(Scene.java:3485)
at javafx.scene.Scene.impl_processMouseEvent(Scene.java:1762)
at javafx.scene.Scene$ScenePeerListener.mouseEvent(Scene.java:2494)
at com.sun.javafx.tk.quantum.GlassViewEventHandler$MouseEventNotification.run(GlassViewEventHandler.java:394)
at com.sun.javafx.tk.quantum.GlassViewEventHandler$MouseEventNotification.run(GlassViewEventHandler.java:295)
at java.security.AccessController.doPrivileged(Native Method)
at com.sun.javafx.tk.quantum.GlassViewEventHandler.lambda$handleMouseEvent$350(GlassViewEventHandler.java:432)
at com.sun.javafx.tk.quantum.QuantumToolkit.runWithoutRenderLock(QuantumToolkit.java:389)
at com.sun.javafx.tk.quantum.GlassViewEventHandler.handleMouseEvent(GlassViewEventHandler.java:431)
at com.sun.glass.ui.View.handleMouseEvent(View.java:555)
at com.sun.glass.ui.View.notifyMouse(View.java:937)
at com.sun.glass.ui.gtk.GtkApplication._runLoop(Native Method)
at com.sun.glass.ui.gtk.GtkApplication.lambda$null$208(GtkApplication.java:245)
at java.lang.Thread.run(Thread.java:748)

I am getting this error at the below code line where I initialize spark context:

JavaSparkContext sc = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate(conf));

Edit 2: It works well when I write local instead of my own spark master IP. But I need to run this on my own IP. So what can be wrong with my master node?


Edit 3: I uploaded whole error stack to code snippet that located under the first edit.

like image 917
JollyRoger Avatar asked May 08 '19 20:05

JollyRoger


1 Answers

This happens when class that defines lambda expression is not available run time. If you are trying to run job on remote cluster from local IDE, you will need to add setJars

SparkConf conf = new SparkConf()
                .setAppName("app name")
                .setJars(new String[]{"/fatJarPath/jar.path"})
                .setMaster("spark://Remote_spark_ip:port");

Alternatively you can build fat jar and submit your job using spark-submit.

like image 70
m-bhole Avatar answered Oct 09 '22 12:10

m-bhole