Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concurrent job Execution in Spark

I have used input data with the below format:

0
1
2
3
4
5
…
14

Input Location: hdfs://localhost:9000/Input/datasource

I have used the following code snippet to save RDD as text file using multiple threads:

package org.apache.spark.examples;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.avro.ipc.specific.Person;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Tuple2;

class RunnableDemo implements Runnable
{

    private Thread t;
    private String threadName;
    private String path;
    private JavaRDD<String> javaRDD;
//  private JavaSparkContext javaSparkContext;

    RunnableDemo(String threadName,JavaRDD<String> javaRDD,String path)
    {
        this.threadName=threadName;
        this.javaRDD=javaRDD;
        this.path=path;
//      this.javaSparkContext=javaSparkContext;
    }


    @Override
    public void run() {
        System.out.println("Running " +  threadName );      
        try {
            this.javaRDD.saveAsTextFile(path);
//          System.out.println(this.javaRDD.count());
            Thread.sleep(50);
            } catch (InterruptedException e) {
                System.out.println("Thread " +  threadName + " interrupted.");
                }
         System.out.println("Thread " +  threadName + " exiting.");
//       this.javaSparkContext.stop();
    }

    public void start ()
       {
          System.out.println("Starting " +  threadName );
          if (t == null)
          {
             t = new Thread (this, threadName);
             t.start ();
          }
       }

}

public class SparkJavaTest {



    public static void main(String[] args) {

        //Spark Configurations:

        SparkConf sparkConf=new SparkConf().setAppName("SparkJavaTest");

        JavaSparkContext ctx=new JavaSparkContext(sparkConf);

        SQLContext sqlContext = new SQLContext(ctx);        

        JavaRDD<String> dataCollection=ctx.textFile("hdfs://yarncluster/Input/datasource");




        List<StructField> fields= new ArrayList<StructField>();
        fields.add(DataTypes.createStructField("Id", DataTypes.IntegerType,true));

        JavaRDD<Row> rowRDD =dataCollection.map(
                new Function<String, Row>() {
                    @Override
                    public Row call(String record) throws Exception {
                        String[] fields = record.split("\u0001");                       
                        return RowFactory.create(Integer.parseInt(fields[0].trim()));
                    }                   
                });     

        StructType schema = DataTypes.createStructType(fields);

        DataFrame dataFrame =sqlContext.createDataFrame(rowRDD, schema);        
        dataFrame.registerTempTable("data");

        long recordsCount=dataFrame.count();        
        long splitRecordsCount=5;
        long splitCount =recordsCount/splitRecordsCount;
        List<JavaRDD<Row>> list1=new ArrayList<JavaRDD<Row>>();

        for(int i=0;i<splitCount;i++)
        {
            long start = i*splitRecordsCount;
            long end = (i+1)*splitRecordsCount;         
            DataFrame temp=sqlContext.sql("SELECT * FROM data WHERE Id >="+ start +" AND Id < " + end);         
            list1.add(temp.toJavaRDD());
        }       

        long length =list1.size();

        int split=0;

        for (int i = 0; i < length; i++) {

            JavaRDD rdd1 =list1.get(i);

            JavaPairRDD rdd3=rdd1.cartesian(rdd1);

            JavaPairRDD<Row,Row> rdd4=rdd3.filter(
                    new Function<Tuple2<Row,Row>,Boolean>()
                    {
                        public Boolean call(Tuple2<Row,Row> s)
                        {
                            Row line1=s._1;
                            Row line2=s._2;

                            long app1 = Integer.parseInt(line1.get(0).toString());

                            long app2 = Integer.parseInt(line2.get(0).toString());


                            if(app1<app2)
                            {
                                return true;
                            }
                            return false;
                        }
                    });

            JavaRDD<String> test=rdd4.map(new Function<Tuple2<Row,Row>, String>() {
                @Override
                public String call(Tuple2<Row, Row> s)
                        throws Exception {

                    Row data1=s._1;
                    Row data2=s._2;

                    int x =Integer.parseInt(data1.get(0).toString());
                    int y =Integer.parseInt(data2.get(0).toString());

                    String result =x +","+ y+","+(x+y);
                    return result;
                }
            });

            RunnableDemo R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split);

            R.start();
            split++;            
            R.start();

            int index =i;

            while(index<length)
            {
                JavaRDD rdd2 =list1.get(index);
                 rdd3=rdd1.cartesian(rdd2);

                 rdd4=rdd3.filter(
                        new Function<Tuple2<Row,Row>,Boolean>()
                        {
                            public Boolean call(Tuple2<Row,Row> s)
                            {
                                Row line1=s._1;
                                Row line2=s._2;

                                long app1 = Integer.parseInt(line1.get(0).toString());

                                long app2 = Integer.parseInt(line2.get(0).toString());


                                if(app1<app2)
                                {
                                    return true;
                                }
                                return false;
                            }
                        });         

                test=rdd4.map(new Function<Tuple2<Row,Row>, String>() {
                    @Override
                    public String call(Tuple2<Row, Row> s)
                            throws Exception {

                        Row data1=s._1;
                        Row data2=s._2;

                        int x =Integer.parseInt(data1.get(0).toString());
                        int y =Integer.parseInt(data2.get(0).toString());

                        String result =x +","+ y+","+(x+y);
                        return result;
                    }
                });         

                R =new RunnableDemo("Thread-"+split,test,"hdfs://yarncluster/GettingStarted/Output/"+split);

                R.start();
                split++;            
                index++;                
            }
        }
    }

}

In this case I have faced the following exception

enter image description here

I have tried the solution provided in the following link

How to run concurrent jobs(actions) in Apache Spark using single spark context

But still, I can’t resolve this issue.

Could you please guide me to resolve this?

like image 201
Raja Avatar asked Jun 16 '15 08:06

Raja


1 Answers

First of all, you're trying to execute all the work on the driver node using several threads. This is not really in the spirit of Spark, since each unit of work in your case is independent of the others and can be executed on different machines. Your have a toy example here, but this will become really important with large volumes of data.

A better approach would be to use something like mapPartitions to send the range of keys to each worker and let them execute the corresponding SQL queries, then save the results, with one thread per worker. This will make the code cleaner and easier to reason about (once you get used to the way RDDs work). You'd obviously need to set the level of parallelism and number of partitions (talked about here) for your input data appropriately.

The immediate issue with your code is that the main thread starts other threads, but doesn't wait for them to finish. Normally this causes the spawned threads to terminate along with the parent (see the javadoc). Notice how in the answer to the linked question the main function does a get() on the spawned futures before returning.

like image 92
dskrvk Avatar answered Sep 20 '22 05:09

dskrvk