The other objective of using SequenceFile is to 'pack' many small files into a single large SequenceFile for the MapReduce computation since the design of Hadoop prefers large files (Remember that Hadoop default block size for data is 64MB).
One can easily write a program to create the necessary SequenceFile object and load it into the Hadoop Distributed File System (HDFS). For a convenient utility to convert a tar file to Hadoop SequenceFile, please see Stuart Sierra's blog - A Million Little Files.
Based on the Stuart Sierra's code, I have created a simple utility class that pack a directory of files into a Hadoop SequenceFile. These codes are not fool proof, but I guess it will be good enough for illustration purposes:
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
public class FilesPacker {
private Configuration conf;
public FilesPacker(Configuration conf) {
this.conf = conf;
}
/** Pack all files in a directory into a Hadoop SequenceFile */
public void packAsSequenceFile(String input, String output) throws Exception {
File inputDirectory = new File(input);
if (!inputDirectory.exists())
throw new FileNotFoundException();
else if (!inputDirectory.isDirectory())
throw new InputNotDirectoryException(input + " is not a directory.");
Path outputPath = new Path(output);
FileSystem fileSystem = FileSystem.get(conf);
SequenceFile.Writer swriter = SequenceFile.createWriter(fileSystem, conf,
outputPath,
Text.class, BytesWritable.class,
SequenceFile.CompressionType.BLOCK);
FileInputStream fis = null;
try {
File[] files = inputDirectory.listFiles();
for (int i=0; i Integer.MAX_VALUE) {
throw new FileTooLargeException("A file in the directory is too large.");
}
int asize = (int)size;
byte[] content = new byte[asize];
fis = new FileInputStream(files[i]);
fis.read(content);
Text key = new Text(filename);
BytesWritable value = new BytesWritable(content);
swriter.append(key, value);
fis.close();
}
}
} finally {
if (fis != null) { fis.close(); }
if (swriter != null) { swriter.close(); }
}
} // packAsSequenceFile
}
and the custom exception classes:
public class FileTooLargeException extends Exception {
private static final long serialVersionUID = 7320340946537417022L;
public FileTooLargeException(String message)
{
super(message);
}
}
public class InputNotDirectoryException extends Exception {
private static final long serialVersionUID = 7320340946537417022L;
public InputNotDirectoryException(String message)
{
super(message);
}
}
To use SequenceFile as the input and output for a job in Hadoop, define the FileInputFormat and FileOutputFormat as follows:
...
Job yourJob = new Job(conf, "My MapReduce Job");
yourJob.setInputFormatClass(SequenceFileInputFormat.class);
yourJob.setOutputFormatClass(SequenceFileOutputFormat.class);
...
You will probably find SequenceFile very useful when you are dealing with binary data such as arrays of your own data objects. Of course, you will need a helper class to marshal the objects (or array of objects) into byte array. The following is a simple class for that:
public class ObjectConvertor {
/** Converts an object to an array of bytes.
* @param object the object to convert.
* @return the associated byte array.
*/
public static byte[] toBytes(Object object){
java.io.ByteArrayOutputStream baos = new
java.io.ByteArrayOutputStream();
try{
java.io.ObjectOutputStream oos = new
java.io.ObjectOutputStream(baos);
oos.writeObject(object);
}catch(java.io.IOException ioe){
System.err.println(ioe.getMessage());
}
return baos.toByteArray();
}
/** Converts an array of bytes back to its constituent object.
* The input array is assumed to have been created from the original object.
* @param bytes the byte array to convert.
* @return the associated object.
*/
public static Object toObject(byte[] bytes){
Object object = null;
try{
object = new java.io.ObjectInputStream(new
java.io.ByteArrayInputStream(bytes)).readObject();
}catch(java.io.IOException ioe){
System.err.println(ioe.getMessage());
}catch(java.lang.ClassNotFoundException cnfe){
System.err.println(cnfe.getMessage());
}
return object;
}
}
Hope this short discussion on Hadoop SequenceFile will be useful.
1 comments:
Good post
Post a Comment