I try to read a csv-file in spark and I want to split the lines, which are comma-seperated, so that I have an RDD with a two dimensional Array. I am very new to Spark.
I tried to do this:
public class SimpleApp
{
public static void main(String[] args) throws Exception
{
String master = "local[2]";
String csvInput = "/home/userName/Downloads/countrylist.csv";
String csvOutput = "/home/userName/Downloads/countrylist";
JavaSparkContext sc = new JavaSparkContext(master, "loadwholecsv", System.getenv("SPARK_HOME"), System.getenv("JARS"));
JavaRDD<String> csvData = sc.textFile(csvInput, 1);
JavaRDD<String> words = csvData.map(new Function <List<String>>() { //line 43
@Override
public List<String> call(String s) {
return Arrays.asList(s.split("\\s*,\\s*"));
}
});
words.saveAsTextFile(csvOutput);
}
}
This should split the lines and return an ArrayList. But I am not sure about this. I get this error:
SimpleApp.java:[43,58] wrong number of type arguments; required 2
Spark Map function takes one element as input process it according to custom code (specified by the developer) and returns one element at a time. Map transforms an RDD of length N into another RDD of length N. The input and output RDDs will typically have the same number of records.
Spark DataFrame columns support arrays and maps, which are great for data sets that have an arbitrary length.
Both of the functions map() and flatMap are used for transformation and mapping operations. map() function produces one output for one input value, whereas flatMap() function produces an arbitrary no of values as output (ie zero or more than zero) for each input value.
So there are a two small issues with the program. First is you probably want flatMap rather than map, since you are trying to return an RDD of words rather than an RDD of Lists of words, we can use flatMap to flatten the result. The other is, our function class also requires the type of the input it is called on. I'd replace the JavaRDD words... with:
JavaRDD<String> words = rdd.flatMap(
new FlatMapFunction<String, String>() { public Iterable<String> call(String s) {
return Arrays.asList(s.split("\\s*,\\s*"));
}});
This is the sample of code from https://opencredo.com/data-analytics-using-cassandra-and-spark/ tutorial in Java.
Scala code :
/* 1*/ val includedStatuses = Set("COMPLETED", "REPAID")
/* 2*/ val now = new Date();
/* 3*/ sc.cassandraTable("cc", "cc_transactions")
/* 4*/ .select("customerid", "amount", "card", "status", "id")
/* 5*/ .where("id < minTimeuuid(?)", now)
/* 6*/ .filter(includedStatuses contains _.getString("status"))
/* 7*/ .keyBy(row => (row.getString("customerid"), row.getString("card")))
/* 8*/ .map { case (key, value) => (key, value.getInt("amount")) }
/* 9*/ .reduceByKey(_ + _)
/*10*/ .map { case ((customerid, card), balance) => (customerid, card, balance, now) }
/*11*/ .saveToCassandra("cc", "cc_balance", SomeColumns("customerid", "card", "balance", "updated_at"))
Java code :
SparkContextJavaFunctions functions = CassandraJavaUtil.javaFunctions(ProjectPropertie.context);
JavaRDD<Balance> balances = functions.cassandraTable(ProjectPropertie.KEY_SPACE, Transaction.TABLE_NAME)
.select("customerid", "amount", "card", "status", "id")
.where("id < minTimeuuid(?)", date)
.filter( row -> row.getString("status").equals("COMPLETED") )
.keyBy(row -> new Tuple2<>(row.getString("customerid"), row.getString("card")))
.mapToPair( row -> new Tuple2<>(row._1,row._2.getInt("amount")))
.reduceByKey( (i1,i2) -> i1.intValue()+i2.intValue())
.flatMap(new FlatMapFunction<Tuple2<Tuple2<String, String>, Integer>, Balance>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public Iterator<Balance> call(Tuple2<Tuple2<String, String>, Integer> r) throws Exception {
List<Balance> list = new ArrayList<Balance>();
list.add(new Balance(r._1._1, r._1._2, r._2,reportDate));
return list.iterator();
}
}).cache();
Where ProjectPropertie.context
is SparkContext
Here is how you can get SparkContext (only one context per JVM you should use):
SparkConf conf = new SparkConf(true).setAppName("App_name").setMaster("local[2]").set("spark.executor.memory", "1g")
.set("spark.cassandra.connection.host", "127.0.0.1,172.17.0.2")
.set("spark.cassandra.connection.port", "9042")
.set("spark.cassandra.auth.username", "cassandra")
.set("spark.cassandra.auth.password", "cassandra");
SparkContext context = new SparkContext(conf);
For datasource I'm using Cassandra, where 172.17.0.2 is docker container where my Cassandra node is running and 127.0.0.1 is the host (in this case is local)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With