Sunday, June 27, 2010

Hadoop SequenceFile

Hadoop is not restricted to processing plain text data. For user custom binary data type, one can use the SequenceFile. SequenceFile is a flat file consisting of binary key/value pairs (although you can always use Text data type for the key). In fact, internally Hadoop uses SequenceFile to store the temporary outputs of maps.

The other objective of using SequenceFile is to 'pack' many small files into a single large SequenceFile for the MapReduce computation since the design of Hadoop prefers large files (Remember that Hadoop default block size for data is 64MB).

One can easily write a program to create the necessary SequenceFile object and load it into the Hadoop Distributed File System (HDFS). For a convenient utility to convert a tar file to Hadoop SequenceFile, please see Stuart Sierra's blog - A Million Little Files.

Based on the Stuart Sierra's code, I have created a simple utility class that pack a directory of files into a Hadoop SequenceFile. These codes are not fool proof, but I guess it will be good enough for illustration purposes:



import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

public class FilesPacker {
 
 private Configuration conf;

 public FilesPacker(Configuration conf) {
  this.conf = conf;
 }

 /** Pack all files in a directory into a Hadoop SequenceFile */
 public void packAsSequenceFile(String input, String output) throws Exception {

  File inputDirectory = new File(input);
  if (!inputDirectory.exists()) 
   throw new FileNotFoundException();
  else if (!inputDirectory.isDirectory()) 
   throw new InputNotDirectoryException(input  + " is not a directory.");

  Path outputPath = new Path(output);
  FileSystem fileSystem = FileSystem.get(conf);
  SequenceFile.Writer swriter = SequenceFile.createWriter(fileSystem, conf, 
    outputPath,
    Text.class, BytesWritable.class,
    SequenceFile.CompressionType.BLOCK);  
  FileInputStream fis = null;
  try {
   File[] files = inputDirectory.listFiles();
   for (int i=0; i Integer.MAX_VALUE) {
               throw new FileTooLargeException("A file in the directory is too large.");
           }
     int asize = (int)size;     
     byte[] content = new byte[asize];
     fis = new FileInputStream(files[i]);
     fis.read(content);

     Text key = new Text(filename);
     BytesWritable value = new BytesWritable(content);
     swriter.append(key, value);
     fis.close();
    }
   }
  } finally {
   if (fis != null) { fis.close(); }
   if (swriter != null) { swriter.close(); }
  }
 } // packAsSequenceFile
 
}


and the custom exception classes:




public class FileTooLargeException extends Exception {

 private static final long serialVersionUID = 7320340946537417022L;

 public FileTooLargeException(String message)
   {
  super(message);
   }

}


public class InputNotDirectoryException extends Exception {
 
 private static final long serialVersionUID = 7320340946537417022L;

 public InputNotDirectoryException(String message)
   {
  super(message);
   }

}


To use SequenceFile as the input and output for a job in Hadoop, define the FileInputFormat and FileOutputFormat as follows:



...
Job yourJob = new Job(conf, "My MapReduce Job");
yourJob.setInputFormatClass(SequenceFileInputFormat.class);
yourJob.setOutputFormatClass(SequenceFileOutputFormat.class);
...


You will probably find SequenceFile very useful when you are dealing with binary data such as arrays of your own data objects. Of course, you will need a helper class to marshal the objects (or array of objects) into byte array. The following is a simple class for that:




public class ObjectConvertor {
 /** Converts an object to an array of bytes. 
  * @param object the object to convert.
  * @return the associated byte array.
  */
 public static byte[] toBytes(Object object){
  java.io.ByteArrayOutputStream baos = new
  java.io.ByteArrayOutputStream();
  try{
   java.io.ObjectOutputStream oos = new
   java.io.ObjectOutputStream(baos);
   oos.writeObject(object);
  }catch(java.io.IOException ioe){
   System.err.println(ioe.getMessage());
  }
  return baos.toByteArray();
 }

 /** Converts an array of bytes back to its constituent object.
  * The input array is assumed to have been created from the original object.
  * @param bytes the byte array to convert.
  * @return the associated object.
  */
 public static Object toObject(byte[] bytes){
  Object object = null;
  try{
   object = new java.io.ObjectInputStream(new
     java.io.ByteArrayInputStream(bytes)).readObject();
  }catch(java.io.IOException ioe){
   System.err.println(ioe.getMessage());
  }catch(java.lang.ClassNotFoundException cnfe){
   System.err.println(cnfe.getMessage());
  }
  return object;
 } 
}


Hope this short discussion on Hadoop SequenceFile will be useful.

1 comments:

Unknown said...

Good post

Post a Comment