I am new to spark, and I want to use group-by & reduce to find the following from CSV (one line by employed):
Department, Designation, costToCompany, State
Sales, Trainee, 12000, UP
Sales, Lead, 32000, AP
Sales, Lead, 32000, LA
Sales, Lead, 32000, TN
Sales, Lead, 32000, AP
Sales, Lead, 32000, TN
Sales, Lead, 32000, LA
Sales, Lead, 32000, LA
Marketing, Associate, 18000, TN
Marketing, Associate, 18000, TN
HR, Manager, 58000, TN
I would like to simplify the about CSV with group by Department, Designation, State with additional columns with sum(costToCompany) and TotalEmployeeCount
Should get a result like:
Dept, Desg, state, empCount, totalCost
Sales,Lead,AP,2,64000
Sales,Lead,LA,3,96000
Sales,Lead,TN,2,64000
Is there any way to achieve this using transformations and actions. Or should we go for RDD operations?
Apache PySpark provides the "csv("path")" for reading a CSV file into the Spark DataFrame and the "dataframeObj. write. csv("path")" for saving or writing to the CSV file. The Apache PySpark supports reading the pipe, comma, tab, and other delimiters/separator files.
Converting Spark RDD to DataFrame can be done using toDF(), createDataFrame() and transforming rdd[Row] to the data frame.
CSV file can be parsed with Spark built-in CSV reader. It will return DataFrame/DataSet on the successful read of the file. On top of DataFrame/DataSet, you apply SQL-like operations easily.
spark
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL Example")
.getOrCreate();
StructType
import org.apache.spark.sql.types.StructType;
StructType schema = new StructType()
.add("department", "string")
.add("designation", "string")
.add("ctc", "long")
.add("state", "string");
Dataset<Row> df = spark.read()
.option("mode", "DROPMALFORMED")
.schema(schema)
.csv("hdfs://path/input.csv");
more option on reading data from CSV file
1. SQL way
Register a table in spark sql metastore to perform SQL operation
df.createOrReplaceTempView("employee");
Run SQL query on registered dataframe
Dataset<Row> sqlResult = spark.sql( "SELECT department, designation, state, SUM(ctc), COUNT(department)" + " FROM employee GROUP BY department, designation, state"); sqlResult.show(); //for testing
We can even execute SQL directly on CSV file with out creating table with Spark SQL
2. Object chaining or Programming or Java-like way
Do the necessary import for sql functions
import static org.apache.spark.sql.functions.count; import static org.apache.spark.sql.functions.sum;
Use
groupBy
andagg
on dataframe/dataset to performcount
andsum
on dataDataset<Row> dfResult = df.groupBy("department", "designation", "state") .agg(sum("ctc"), count("department")); // After Spark 1.6 columns mentioned in group by will be added to result by default dfResult.show();//for testing
"org.apache.spark" % "spark-core_2.11" % "2.0.0"
"org.apache.spark" % "spark-sql_2.11" % "2.0.0"
Create a Class (Schema) to encapsulate your structure (it’s not required for the approach B, but it would make your code easier to read if you are using Java)
public class Record implements Serializable {
String department;
String designation;
long costToCompany;
String state;
// constructor , getters and setters
}
Loading CVS (JSON) file
JavaSparkContext sc;
JavaRDD<String> data = sc.textFile("path/input.csv");
//JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions
SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified
JavaRDD<Record> rdd_records = sc.textFile(data).map(
new Function<String, Record>() {
public Record call(String line) throws Exception {
// Here you can use JSON
// Gson gson = new Gson();
// gson.fromJson(line, Record.class);
String[] fields = line.split(",");
Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
return sd;
}
});
At this point you have 2 approaches:
Register a table (using the your defined Schema Class)
JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
table.registerAsTable("record_table");
table.printSchema();
Query the table with your desired Query-group-by
JavaSchemaRDD res = sqlContext.sql("
select department,designation,state,sum(costToCompany),count(*)
from record_table
group by department,designation,state
");
Here you would also be able to do any other query you desire, using a SQL approach
Mapping using a composite key: Department
,Designation
,State
JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD =
rdd_records.mapToPair(new
PairFunction<Record, String, Tuple2<Long, Integer>>(){
public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
Tuple2<String, Tuple2<Long, Integer>> t2 =
new Tuple2<String, Tuple2<Long,Integer>>(
record.Department + record.Designation + record.State,
new Tuple2<Long, Integer>(record.costToCompany,1)
);
return t2;
}
});
reduceByKey using the composite key, summing costToCompany
column, and accumulating the number of records by key
JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records =
records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
Integer>, Tuple2<Long, Integer>>() {
public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
Tuple2<Long, Integer> v2) throws Exception {
return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
}
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With