Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Drools In Spark for Streaming File

We were able to successfully Integrate drools with spark, When we try to apply rules from Drools we were able to do for Batch file, which is present in HDFS, But we tried to use drools for Streaming file so that we can make decision instantly, But we couldn't figure out how to do it.Below is the snippet of the code what we are trying to achieve.
Case1: .

    SparkConf conf = new SparkConf().setAppName("sample");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> javaRDD = sc.textFile("/user/root/spark/sample.dat");
    List<String> store = new ArrayList<String>();
    store = javaRDD.collect();

Case 2: when we use streaming context

SparkConf sparkconf = new SparkConf().setAppName("sparkstreaming");
    JavaStreamingContext ssc = 
              new JavaStreamingContext(sparkconf, new Duration(1));

    JavaDStream<String> lines = ssc.socketTextStream("xx.xx.xx.xx", xxxx);

In the first case we were able apply our rules on the variable store, but in the second case we were not able to apply rules on the dstream lines.

If someone has some idea, how it can be done, will be a great help.

like image 790
beginner Avatar asked Nov 09 '22 19:11

beginner


1 Answers

Here is one way to get it done.

  1. Create your knowledge session with business rules first.

    //Create knowledge and session here
    KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase();
    KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
    kbuilder.add( ResourceFactory.newFileResource( "rulefile.drl"),
            ResourceType.DRL );
    Collection<KnowledgePackage> pkgs = kbuilder.getKnowledgePackages();
    kbase.addKnowledgePackages( pkgs );
    final StatelessKnowledgeSession ksession = kbase.newStatelessKnowledgeSession();
    
  2. Create JavaDStream using StreamingContext.

    SparkConf sparkconf = new SparkConf().setAppName("sparkstreaming");
    JavaStreamingContext ssc = 
              new JavaStreamingContext(sparkconf, new Duration(1));
    JavaDStream<String> lines = ssc.socketTextStream("xx.xx.xx.xx", xxxx);  
    
  3. Call DStream's foreachRDD to create facts and fire your rules.

    lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
      @Override
      public Void call(JavaRDD<String> rdd) throws Exception {
         List<String> facts = rdd.collect();
         //Apply rules on facts here
         ksession.execute(facts);
         return null;
      }
    });
    
like image 81
Krishna Gajula Avatar answered Nov 14 '22 22:11

Krishna Gajula