HCatalog – Input Output Format

The HCatInputFormat and HCatOutputFormat interfaces are used to read data from HDFS and after processing, write the resultant data into HDFS using MapReduce job. Let us elaborate the Input and Output format interfaces.

HCatInputFormat

The HCatInputFormat is used with MapReduce jobs to read data from HCatalog-managed tables. HCatInputFormat exposes a Hadoop 0.20 MapReduce API for reading data as if it had been published to a table.

Sr.No.Method Name & Description
1public static HCatInputFormat setInput(Job job, String dbName, String tableName)throws IOExceptionSet inputs to use for the job. It queries the metastore with the given input specification and serializes matching partitions into the job configuration for MapReduce tasks.
2public static HCatInputFormat setInput(Configuration conf, String dbName, String tableName) throws IOExceptionSet inputs to use for the job. It queries the metastore with the given input specification and serializes matching partitions into the job configuration for MapReduce tasks.
3public HCatInputFormat setFilter(String filter)throws IOExceptionSet a filter on the input table.
4public HCatInputFormat setProperties(Properties properties) throws IOExceptionSet properties for the input format.

The HCatInputFormat API includes the following methods −

  • setInput
  • setOutputSchema
  • getTableSchema

To use HCatInputFormat to read data, first instantiate an InputJobInfo with the necessary information from the table being read and then call setInput with the InputJobInfo.

You can use the setOutputSchema method to include a projection schema, to specify the output fields. If a schema is not specified, all the columns in the table will be returned. You can use the getTableSchema method to determine the table schema for a specified input table.

HCatOutputFormat

HCatOutputFormat is used with MapReduce jobs to write data to HCatalog-managed tables. HCatOutputFormat exposes a Hadoop 0.20 MapReduce API for writing data to a table. When a MapReduce job uses HCatOutputFormat to write output, the default OutputFormat configured for the table is used and the new partition is published to the table after the job completes.

Sr.No.Method Name & Description
1public static void setOutput (Configuration conf, Credentials credentials, OutputJobInfo outputJobInfo) throws IOExceptionSet the information about the output to write for the job. It queries the metadata server to find the StorageHandler to use for the table. It throws an error if the partition is already published.
2public static void setSchema (Configuration conf, HCatSchema schema) throws IOExceptionSet the schema for the data being written out to the partition. The table schema is used by default for the partition if this is not called.
3public RecordWriter <WritableComparable<?>, HCatRecord > getRecordWriter (TaskAttemptContext context)throws IOException, InterruptedExceptionGet the record writer for the job. It uses the StorageHandler’s default OutputFormat to get the record writer.
4public OutputCommitter getOutputCommitter (TaskAttemptContext context) throws IOException, InterruptedExceptionGet the output committer for this output format. It ensures that the output is committed correctly.

The HCatOutputFormat API includes the following methods −

  • setOutput
  • setSchema
  • getTableSchema

The first call on the HCatOutputFormat must be setOutput; any other call will throw an exception saying the output format is not initialized.

The schema for the data being written out is specified by the setSchema method. You must call this method, providing the schema of data you are writing. If your data has the same schema as the table schema, you can use HCatOutputFormat.getTableSchema() to get the table schema and then pass that along to setSchema().

Example

The following MapReduce program reads data from one table which it assumes to have an integer in the second column (“column 1”), and counts how many instances of each distinct value it finds. That is, it does the equivalent of “select col1, count(*) from $table group by col1;“.

For example, if the values in the second column are {1, 1, 1, 3, 3, 5}, then the program will produce the following output of values and counts −

1, 3
3, 2
5, 1

Let us now take a look at the program code −

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.HCatalog.common.HCatConstants;
import org.apache.HCatalog.data.DefaultHCatRecord;
import org.apache.HCatalog.data.HCatRecord;
import org.apache.HCatalog.data.schema.HCatSchema;

import org.apache.HCatalog.mapreduce.HCatInputFormat;
import org.apache.HCatalog.mapreduce.HCatOutputFormat;
import org.apache.HCatalog.mapreduce.InputJobInfo;
import org.apache.HCatalog.mapreduce.OutputJobInfo;

public class GroupByAge extends Configured implements Tool {

   public static class Map extends Mapper<WritableComparable, 
      HCatRecord, IntWritable, IntWritable> {
      int age;
		
      @Override
      protected void map(
         WritableComparable key, HCatRecord value,
         org.apache.hadoop.mapreduce.Mapper<WritableComparable,
         HCatRecord, IntWritable, IntWritable>.Context context
      )throws IOException, InterruptedException {
         age = (Integer) value.get(1);
         context.write(new IntWritable(age), new IntWritable(1));
      }
   }
	
   public static class Reduce extends Reducer<IntWritable, IntWritable,
      WritableComparable, HCatRecord> {
      @Override
      protected void reduce(
         IntWritable key, java.lang.Iterable<IntWritable> values,
         org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
         WritableComparable, HCatRecord>.Context context
      )throws IOException ,InterruptedException {
         int sum = 0;
         Iterator<IntWritable> iter = values.iterator();
			
         while (iter.hasNext()) {
            sum++;
            iter.next();
         }
			
         HCatRecord record = new DefaultHCatRecord(2);
         record.set(0, key.get());
         record.set(1, sum);
         context.write(null, record);
      }
   }
	
   public int run(String[] args) throws Exception {
      Configuration conf = getConf();
      args = new GenericOptionsParser(conf, args).getRemainingArgs();
		
      String serverUri = args[0];
      String inputTableName = args[1];
      String outputTableName = args[2];
      String dbName = null;
      String principalID = System
		
      .getProperty(HCatConstants.HCAT_METASTORE_PRINCIPAL);
      if (principalID != null)
      conf.set(HCatConstants.HCAT_METASTORE_PRINCIPAL, principalID);
      Job job = new Job(conf, "GroupByAge");
      HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));

      // initialize HCatOutputFormat
      job.setInputFormatClass(HCatInputFormat.class);
      job.setJarByClass(GroupByAge.class);
      job.setMapperClass(Map.class);
      job.setReducerClass(Reduce.class);
		
      job.setMapOutputKeyClass(IntWritable.class);
      job.setMapOutputValueClass(IntWritable.class);
      job.setOutputKeyClass(WritableComparable.class);
      job.setOutputValueClass(DefaultHCatRecord.class);
		
      HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
      HCatSchema s = HCatOutputFormat.getTableSchema(job);
      System.err.println("INFO: output schema explicitly set for writing:" + s);
      HCatOutputFormat.setSchema(job, s);
      job.setOutputFormatClass(HCatOutputFormat.class);
      return (job.waitForCompletion(true) ? 0 : 1);
   }
	
   public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new GroupByAge(), args);
      System.exit(exitCode);
   }
}

Before compiling the above program, you have to download some jars and add those to the classpath for this application. You need to download all the Hive jars and HCatalog jars (HCatalog-core-0.5.0.jar, hive-metastore-0.10.0.jar, libthrift-0.7.0.jar, hive-exec-0.10.0.jar, libfb303-0.7.0.jar, jdo2-api-2.3-ec.jar, slf4j-api-1.6.1.jar).

Use the following commands to copy those jar files from local to HDFS and add those to the classpath.

bin/hadoop fs -copyFromLocal $HCAT_HOME/share/HCatalog/HCatalog-core-0.5.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp

export LIB_JARS=hdfs:///tmp/HCatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar

Use the following command to compile and execute the given program.

$HADOOP_HOME/bin/hadoop jar GroupByAge tmp/hive

Now, check your output directory (hdfs: user/tmp/hive) for the output (part_0000, part_0001).

Leave a Reply