I am looking for an example which is using the new API to read and write Sequence Files.
Effectively I need to know how to use these functions
createWriter(Configuration conf, org.apache.hadoop.io.SequenceFile.Writer.Option... opts)
The Old definition is not working for me:
SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass());
Similarly I need to know what will be the code for reading the Sequence file, as the follwoing is deprecated:
SequenceFile.Reader(fs, path, conf);
Here is the way to use the same -
String uri = args[0];
Configuration conf = new Configuration();
Path path = new Path( uri);
IntWritable key = new IntWritable();
Text value = new Text();
CompressionCodec Codec = new GzipCodec();
SequenceFile.Writer writer = null;
Option optPath = SequenceFile.Writer.file(path);
Option optKey = SequenceFile.Writer.keyClass(key.getClass());
Option optVal = SequenceFile.Writer.valueClass(value.getClass());
Option optCom = SequenceFile.Writer.compression(CompressionType.RECORD, Codec);
writer = SequenceFile.createWriter( conf, optPath, optKey, optVal, optCom);
A SequenceFile is a flat, binary file type that serves as a container for data to be used in Apache Hadoop distributed computing projects. SequenceFiles are used extensively with MapReduce.
How many formats of SequenceFile are present in Hadoop I/O? Explanation: SequenceFile has 3 available formats: An “Uncompressed” format, a “Record Compressed” format and a “Block-Compressed”.
Reading SequenceFile with Command-line Interface: Hadoop provides command hadoop fs -text to display the contents of sequence file in text format.
To read a SequenceFile using Java API in Hadoop create an instance of SequenceFile. Reader. Using that reader instance you can iterate the (key, value) pairs in the SequenceFile using the next() method. Then you can read the previously written SequenceFile using the following command.
public class SequenceFilesTest {
@Test
public void testSeqFileReadWrite() throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
Path seqFilePath = new Path("file.seq");
SequenceFile.Writer writer = SequenceFile.createWriter(conf,
Writer.file(seqFilePath), Writer.keyClass(Text.class),
Writer.valueClass(IntWritable.class));
writer.append(new Text("key1"), new IntWritable(1));
writer.append(new Text("key2"), new IntWritable(2));
writer.close();
SequenceFile.Reader reader = new SequenceFile.Reader(conf,
Reader.file(seqFilePath));
Text key = new Text();
IntWritable val = new IntWritable();
while (reader.next(key, val)) {
System.err.println(key + "\t" + val);
}
reader.close();
}
}
I'm late by more than an year to answer but just got started with Hadoop 2.4.1 :)
Below is the code, someone may find it useful.
Note: It includes the commented 1.x code to read and write a sequence file. I was wondering where does it pick up the file system but when I executed it directly on the cluster, it picked it properly(probably, from core-site.xml as mentioned in Configuration
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader.Option;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
public class SequenceFileOperator {
private Configuration conf = new Configuration();
/*private FileSystem fs;
{
try {
fs = FileSystem.get(URI.create("hdfs://cldx-1336-1202:9000"), conf);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}*/
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
if (args == null || args.length < 2) {
System.out
.println("Following are the possible invocations <operation id> <arg1> <arg2> ...");
System.out
.println("1 <absolute path of directory containing documents> <HDFS path of the sequence file");
System.out.println("2 <HDFS path of the sequence file>");
return;
}
int operation = Integer.valueOf(args[0]);
SequenceFileOperator docToSeqFileWriter = new SequenceFileOperator();
switch (operation) {
case 1: {
String docDirectoryPath = args[1];
String sequenceFilePath = args[2];
System.out.println("Writing files present at " + docDirectoryPath
+ " to the sequence file " + sequenceFilePath);
docToSeqFileWriter.loadDocumentsToSequenceFile(docDirectoryPath,
sequenceFilePath);
break;
}
case 2: {
String sequenceFilePath = args[1];
System.out.println("Reading the sequence file " + sequenceFilePath);
docToSeqFileWriter.readSequenceFile(sequenceFilePath);
break;
}
}
}
private void readSequenceFile(String sequenceFilePath) throws IOException {
// TODO Auto-generated method stub
/*
* SequenceFile.Reader sequenceFileReader = new SequenceFile.Reader(fs,
* new Path(sequenceFilePath), conf);
*/
Option filePath = SequenceFile.Reader.file(new Path(sequenceFilePath));
SequenceFile.Reader sequenceFileReader = new SequenceFile.Reader(conf,
filePath);
Writable key = (Writable) ReflectionUtils.newInstance(
sequenceFileReader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(
sequenceFileReader.getValueClass(), conf);
try {
while (sequenceFileReader.next(key, value)) {
System.out
.printf("[%s] %s %s \n",
sequenceFileReader.getPosition(), key,
value.getClass());
}
} finally {
IOUtils.closeStream(sequenceFileReader);
}
}
private void loadDocumentsToSequenceFile(String docDirectoryPath,
String sequenceFilePath) throws IOException {
// TODO Auto-generated method stub
File docDirectory = new File(docDirectoryPath);
if (!docDirectory.isDirectory()) {
System.out
.println("Please provide an absolute path of a directory that contains the documents to be added to the sequence file");
return;
}
/*
* SequenceFile.Writer sequenceFileWriter =
* SequenceFile.createWriter(fs, conf, new Path(sequenceFilePath),
* Text.class, BytesWritable.class);
*/
org.apache.hadoop.io.SequenceFile.Writer.Option filePath = SequenceFile.Writer
.file(new Path(sequenceFilePath));
org.apache.hadoop.io.SequenceFile.Writer.Option keyClass = SequenceFile.Writer
.keyClass(Text.class);
org.apache.hadoop.io.SequenceFile.Writer.Option valueClass = SequenceFile.Writer
.valueClass(BytesWritable.class);
SequenceFile.Writer sequenceFileWriter = SequenceFile.createWriter(
conf, filePath, keyClass, valueClass);
File[] documents = docDirectory.listFiles();
try {
for (File document : documents) {
RandomAccessFile raf = new RandomAccessFile(document, "r");
byte[] content = new byte[(int) raf.length()];
raf.readFully(content);
sequenceFileWriter.append(new Text(document.getName()),
new BytesWritable(content));
raf.close();
}
} finally {
IOUtils.closeStream(sequenceFileWriter);
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With