I am attempting to read a large text file (2 to 3 gb). I need to read the text file line by line and convert each line into a Json object. I have tried using .collect() and .toLocalIterator() to read through the text file. collect() is fine for small files but will not work for large files. I know that .toLocalIterator() collects data scattered around the cluster into a single cluster. According to the documentation .toLocalIterator() is ineffective when dealing with large RDD's as it will run into memory issues. Is there an efficient way to read large text files in a multi node cluster?
Below is a method with my various attempts at reading through the file and converting each line into a json.
public static void jsonConversion() {
JavaRDD<String> lines = sc.textFile(path);
String newrows = lines.first(); //<--- This reads the first line of the text file
// Reading through with
// tolocaliterator--------------------------------------------
Iterator<String> newstuff = lines.toLocalIterator();
System.out.println("line 1 " + newstuff.next());
System.out.println("line 2 " + newstuff.next());
// Inserting lines in a list.
// Note: .collect() is appropriate for small files
// only.-------------------------
List<String> rows = lines.collect();
// Sets loop limit based on the number on lines in text file.
int count = (int) lines.count();
System.out.println("Number of lines are " + count);
// Using google's library to create a Json builder.
GsonBuilder gsonBuilder = new GsonBuilder();
Gson gson = new GsonBuilder().setLenient().create();
// Created an array list to insert json objects.
ArrayList<String> jsonList = new ArrayList<>();
// Converting each line of the text file into a Json formatted string and
// inserting into the array list 'jsonList'
for (int i = 0; i <= count - 1; i++) {
String JSONObject = gson.toJson(rows.get(i));
Gson prettyGson = new GsonBuilder().setPrettyPrinting().create();
String prettyJson = prettyGson.toJson(rows.get(i));
jsonList.add(prettyJson);
}
// For printing out the all the json objects
int lineNumber = 1;
for (int i = 0; i <= count - 1; i++) {
System.out.println("line " + lineNumber + "-->" + jsonList.get(i));
lineNumber++;
}
}
Below is a list of libraries that I am using
//Spark Libraries
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
//Java Libraries
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
//Json Builder Libraries
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
You can try to use map function on RDD instead of collecting all results.
JavaRDD<String> lines = sc.textFile(path);
JavaRDD<String> jsonList = lines.map(line -> <<all your json transformations>>)
In that way, you will achieve a distribute transformation of your data. More about map function.
Converting data to a list or array will force to a data collection on one node. If you want to achieve computations distribution in Spark, you need to use either RDD or Dataframe or Dataset.
JavaRDD<String> lines = sc.textFile(path);
JavaRDD<String> jsonList = lines.map(line ->line.split("/"))
Or you can define a new method inside the map
JavaRDD<String> jsonList = lines.map(line ->{
String newline = line.replace("","")
return newline ;
})
//Do convert the JavaRDD to DataFrame
Converting JavaRDD to DataFrame in Spark java
dfTobeSaved.write.format("json").save("/root/data.json")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With