Sunday, June 27, 2010

Hadoop I/O - the compression matters!

Although largely encapsulated by the Hadoop MapReduce framework, the impact of I/O cannot be under estimated even for the simplest implementation on the MapReduce paradigm. I have come to noticed (after many nights of investigation) that one should always use the compression options in Hadoop for better performance.

These lines are really critical in your driver codes:



  Configuration conf = new Configuration();
  // set map output and reduce output as compress
  conf.setBoolean("mapred.compress.map.output", true);
  conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
  conf.setBoolean("mapred.output.compress", true);
  conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);


The idea is simple: compressed your outputs so that there are less I/O. Especially when we are dealing with large amount of intermediary data.

So, do remember these lines!

Hadoop SequenceFile

Hadoop is not restricted to processing plain text data. For user custom binary data type, one can use the SequenceFile. SequenceFile is a flat file consisting of binary key/value pairs (although you can always use Text data type for the key). In fact, internally Hadoop uses SequenceFile to store the temporary outputs of maps.

The other objective of using SequenceFile is to 'pack' many small files into a single large SequenceFile for the MapReduce computation since the design of Hadoop prefers large files (Remember that Hadoop default block size for data is 64MB).

One can easily write a program to create the necessary SequenceFile object and load it into the Hadoop Distributed File System (HDFS). For a convenient utility to convert a tar file to Hadoop SequenceFile, please see Stuart Sierra's blog - A Million Little Files.

Based on the Stuart Sierra's code, I have created a simple utility class that pack a directory of files into a Hadoop SequenceFile. These codes are not fool proof, but I guess it will be good enough for illustration purposes:



import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

public class FilesPacker {
 
 private Configuration conf;

 public FilesPacker(Configuration conf) {
  this.conf = conf;
 }

 /** Pack all files in a directory into a Hadoop SequenceFile */
 public void packAsSequenceFile(String input, String output) throws Exception {

  File inputDirectory = new File(input);
  if (!inputDirectory.exists()) 
   throw new FileNotFoundException();
  else if (!inputDirectory.isDirectory()) 
   throw new InputNotDirectoryException(input  + " is not a directory.");

  Path outputPath = new Path(output);
  FileSystem fileSystem = FileSystem.get(conf);
  SequenceFile.Writer swriter = SequenceFile.createWriter(fileSystem, conf, 
    outputPath,
    Text.class, BytesWritable.class,
    SequenceFile.CompressionType.BLOCK);  
  FileInputStream fis = null;
  try {
   File[] files = inputDirectory.listFiles();
   for (int i=0; i Integer.MAX_VALUE) {
               throw new FileTooLargeException("A file in the directory is too large.");
           }
     int asize = (int)size;     
     byte[] content = new byte[asize];
     fis = new FileInputStream(files[i]);
     fis.read(content);

     Text key = new Text(filename);
     BytesWritable value = new BytesWritable(content);
     swriter.append(key, value);
     fis.close();
    }
   }
  } finally {
   if (fis != null) { fis.close(); }
   if (swriter != null) { swriter.close(); }
  }
 } // packAsSequenceFile
 
}


and the custom exception classes:




public class FileTooLargeException extends Exception {

 private static final long serialVersionUID = 7320340946537417022L;

 public FileTooLargeException(String message)
   {
  super(message);
   }

}


public class InputNotDirectoryException extends Exception {
 
 private static final long serialVersionUID = 7320340946537417022L;

 public InputNotDirectoryException(String message)
   {
  super(message);
   }

}


To use SequenceFile as the input and output for a job in Hadoop, define the FileInputFormat and FileOutputFormat as follows:



...
Job yourJob = new Job(conf, "My MapReduce Job");
yourJob.setInputFormatClass(SequenceFileInputFormat.class);
yourJob.setOutputFormatClass(SequenceFileOutputFormat.class);
...


You will probably find SequenceFile very useful when you are dealing with binary data such as arrays of your own data objects. Of course, you will need a helper class to marshal the objects (or array of objects) into byte array. The following is a simple class for that:




public class ObjectConvertor {
 /** Converts an object to an array of bytes. 
  * @param object the object to convert.
  * @return the associated byte array.
  */
 public static byte[] toBytes(Object object){
  java.io.ByteArrayOutputStream baos = new
  java.io.ByteArrayOutputStream();
  try{
   java.io.ObjectOutputStream oos = new
   java.io.ObjectOutputStream(baos);
   oos.writeObject(object);
  }catch(java.io.IOException ioe){
   System.err.println(ioe.getMessage());
  }
  return baos.toByteArray();
 }

 /** Converts an array of bytes back to its constituent object.
  * The input array is assumed to have been created from the original object.
  * @param bytes the byte array to convert.
  * @return the associated object.
  */
 public static Object toObject(byte[] bytes){
  Object object = null;
  try{
   object = new java.io.ObjectInputStream(new
     java.io.ByteArrayInputStream(bytes)).readObject();
  }catch(java.io.IOException ioe){
   System.err.println(ioe.getMessage());
  }catch(java.lang.ClassNotFoundException cnfe){
   System.err.println(cnfe.getMessage());
  }
  return object;
 } 
}


Hope this short discussion on Hadoop SequenceFile will be useful.

Tuesday, June 1, 2010

Back on Hadoop Again

Finally getting back on Hadoop again after so many months.

This time round, I am going to start afresh with Fedora 11 and Hadoop 0.20.2.

The first step was to set up JAVA.

To configure JAVA_HOME environment variable for all users in Fedora 11, as follows:

For sh/bash/ksh/zsh users, create a new .sh file in /etc/profile.d/:

su - -c 'gedit /etc/profile.d/local.sh'

Append the system-wide environment variables to the file. The variable assignments are of the form:
VARIABLE1=value
VARIABLE2=value
export VARIABLE1 VARIABLE2

Something of particular interest is the passphrase-less ssh in Fedora 11. If you follow the Hadoop quick start guide using ssh-keygen in a X Window environment, you will find that it does not work until you logout of and login again. So, the solution is simply logout and login to get your passphrase-less ssh working.

The rest are as simple as laid out in the Hadoop quick start guide.

Wednesday, November 18, 2009

It's time to do something ...

I thought I would have more time in Edinburgh to try out new stuff, but it ended up I am actually loosing touch with IT. How should i find time in the midst of these endless assignments and homework.

Friday, August 21, 2009

About Hadoop Rack Awareness feature

It was stated in the Hadoop documentation that a Hadoop cluster is rack-aware. Basically it involves writing a small script that accept DNS name (and IP) and print the desired rack_id to the stdout. This script should be configrued in the hadoop-site.xml using the property 'topology.script.name'.

This feature is not well documented like many open source projects. The following is an example of such script written in Python by Vadim Zaliva in one of the discussion:



#!/usr/bin/env python

'''
This script used by hadoop to determine network/rack topology.  It
should be specified in hadoop-site.xml via topology.script.file.name
Property.

topology.script.file.name
/home/hadoop/topology.py

'''

import sys
from string import join

DEFAULT_RACK = '/default/rack0';

RACK_MAP = { '10.72.10.1' : '/datacenter0/rack0',

'10.112.110.26' : '/datacenter1/rack0',
'10.112.110.27' : '/datacenter1/rack0',
'10.112.110.28' : '/datacenter1/rack0',

'10.2.5.1' : '/datacenter2/rack0',
'10.2.10.1' : '/datacenter2/rack1'
}

if len(sys.argv)==1:
print DEFAULT_RACK
else:
print join([RACK_MAP.get(i, DEFAULT_RACK) for i in sys.argv[1:]]," ")




Remember to include the following in the hadoop-site.xml:


<property>
 <name>topology.script.file.name</name>
 <value>/home/hadoop/topology.py</value>
</property>



You can find more details here : How to kick-off Hadoop rack awareness

Wednesday, April 22, 2009

More on Hadoop Part 1

Finally get into more details of Hadoop DFS ...

To check the file information on a file, do this


<$HADOOP_INSTALLATION_DIR>/bin/hadoop fsck



To change the replication factor of a file, do this


<$HADOOP_INSTALLATION_DIR>/bin/hadoop fs -setrep [-R]

the -R is for recursive for a directory



To start a NameNode/JobTracker on a node, do this

<$HADOOP_INSTALLATION_DIR>/bin/hadoop namenode
<$HADOOP_INSTALLATION_DIR>/bin/hadoop jobtracker



To start a DataNode/TaskTracker on a slave node, do this


<$HADOOP_INSTALLATION_DIR>/bin/hadoop datanode
<$HADOOP_INSTALLATION_DIR>/bin/hadoop tasktracker


To rebalance the block replication in a cluster, do this

<$HADOOP_INSTALLATION_DIR>/bin/hadoop balancer



Hadoop works in a "rack"-aware context, i.e. it assumed that nodes are a subset of a rack and a deployment will have multiple racks. This explained the policy of dfs.replication = 3 stating 'one replica on a node in the rack, another replica on a different node in the same rack, and the third on a different node in a different rack'. If not specify, the rackid is 'defaultrack'. Hadoop lets the cluster administrators decide which rack a node belongs to through configuration variable dfs.network.script. When this script is configured, each node runs the script to determine its rackid. See Hadoop JIRA HADOOP-692. Some reference material here: Rack_aware_HDFS_proposal.pdf

That's all for now. Continue to Hadoop ...

More on HBase Part 1

To view the metadata of the 'tables' stored in the HBase, use this command in the <$HBASE_INSTALLATION_DIR>/bin/hbase shell


# hbase(main): xx>scan '.META.'



look at how the tables are distributed over the regionservers from the column=info:server

To put value into a column family without label do this:

# hbase(main): xx>put 'table_name', 'row_key', 'column_family_name:', 'value'

To put value into a column family with a label do this:

# hbase(main): xx>put 'table_name', 'row_key', 'column_family_name:label_name', 'value'

To work with HBase, you have to throw away all SQL concepts. It is just not relational, it is distributed and scalable. One can dynamically add label to a column family as and when required. That means the rows in the 'table' are not of equal length.