MapReduce – Partitioner

A partitioner works like a condition in processing an input dataset. The partitioning phase takes place after the Map phase and before the Reduce phase.

The number of partitioners is equal to the number of reducers. That means a partitioner will divide the data according to the number of reducers. Therefore, the data passed from a single partitioner is processed by a single Reducer.

Partitioner

A partitioner partitions the key-value pairs of intermediate Map-outputs. It partitions the data using a user-defined condition, which works like a hash function. The total number of partitions is the same as the number of Reducer tasks for the job. Let us take an example to understand how the partitioner works.

MapReduce Partitioner Implementation

For the sake of convenience, let us assume we have a small table called Employee with the following data. We will use this sample data as our input dataset to demonstrate how the partitioner works.

IdNameAgeGenderSalary
1201gopal45Male50,000
1202manisha40Female50,000
1203khalil34Male30,000
1204prasanth30Male30,000
1205kiran20Male40,000
1206laxmi25Female35,000
1207bhavya20Female15,000
1208reshma19Female15,000
1209kranthi22Male22,000
1210Satish24Male25,000
1211Krishna25Male25,000
1212Arshad28Male20,000
1213lavanya18Female8,000

We have to write an application to process the input dataset to find the highest salaried employee by gender in different age groups (for example, below 20, between 21 to 30, above 30).

Input Data

The above data is saved as input.txt in the “/home/hadoop/hadoopPartitioner” directory and given as input.

1201gopal45Male50000
1202manisha40Female51000
1203khaleel34Male30000
1204prasanth30Male31000
1205kiran20Male40000
1206laxmi25Female35000
1207bhavya20Female15000
1208reshma19Female14000
1209kranthi22Male22000
1210Satish24Male25000
1211Krishna25Male26000
1212Arshad28Male20000
1213lavanya18Female8000

Based on the given input, the following is the algorithmic explanation of the program.

Map Tasks

The map task accepts the key-value pairs as input while we have the text data in a text file. The input for this map task is as follows −

Input âˆ’ The key would be a pattern such as “any special key + filename + line number” (example: key = @input1) and the value would be the data in that line (example: value = 1201 \t gopal \t 45 \t Male \t 50000).

Method âˆ’ The operation of this map task is as follows −

  • Read the value (record data), which comes as input value from the argument list in a string.
  • Using the split function, separate the gender and store in a string variable.
String[] str = value.toString().split("\t", -3);
String gender=str[3];
  • Send the gender information and the record data value as output key-value pair from the map task to the partition task.
context.write(new Text(gender), new Text(value));
  • Repeat all the above steps for all the records in the text file.

Output âˆ’ You will get the gender data and the record data value as key-value pairs.

Partitioner Task

The partitioner task accepts the key-value pairs from the map task as its input. Partition implies dividing the data into segments. According to the given conditional criteria of partitions, the input key-value paired data can be divided into three parts based on the age criteria.

Input âˆ’ The whole data in a collection of key-value pairs.

key = Gender field value in the record.

value = Whole record data value of that gender.

Method âˆ’ The process of partition logic runs as follows.

  • Read the age field value from the input key-value pair.
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
  • Check the age value with the following conditions.
  • Age less than or equal to 20
  • Age Greater than 20 and Less than or equal to 30.
  • Age Greater than 30.
if(age<=20)
{
   return 0;
}
else if(age>20 && age<=30)
{
   return 1 % numReduceTasks;
}
else
{
   return 2 % numReduceTasks;
}

Output âˆ’ The whole data of key-value pairs are segmented into three collections of key-value pairs. The Reducer works individually on each collection.

Reduce Tasks

The number of partitioner tasks is equal to the number of reducer tasks. Here we have three partitioner tasks and hence we have three Reducer tasks to be executed.

Input − The Reducer will execute three times with a different collection of key-value pairs.

key = gender field value in the record.

value = the whole record data of that gender.

Method − The following logic will be applied to each collection.

  • Read the Salary field value of each record.
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
  • Check the salary with the max variable. If str[4] is the max salary, then assign str[4] to max, otherwise skip the step.
if(Integer.parseInt(str[4])>max)
{
   max=Integer.parseInt(str[4]);
}
  • Repeat Steps 1 and 2 for each key collection (Male & Female are the key collections). After executing these three steps, you will find one max salary from the Male key collection and one max salary from the Female key collection.
context.write(new Text(key), new IntWritable(max));

Output âˆ’ Finally, you will get a set of key-value pair data in three collections of different age groups. It contains the max salary from the Male collection and the max salary from the Female collection in each age group respectively.

After executing the Map, the Partitioner, and the Reduce tasks, the three collections of key-value pair data are stored in three different files as the output.

All three tasks are treated as MapReduce jobs. The following requirements and specifications of these jobs should be specified in the Configurations −

  • Job name
  • Input and Output formats of keys and values
  • Individual classes for Map, Reduce, and Partitioner tasks
Configuration conf = getConf();
 
//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
 
// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
 
//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
 
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
 
//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);
 
//Number of Reducer tasks.
job.setNumReduceTasks(3);
 
//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

Example Program

The following program shows how to implement the partitioners for the given criteria in a MapReduce program.

package partitionerexample;
 
import java.io.*;
 
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
 
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
 
import org.apache.hadoop.util.*;
 
public class PartitionerExample extends Configured implements Tool
{
   //Map class
       
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
   {
      public void map(LongWritable key, Text value, Context context)
      {
         try{
            String[] str = value.toString().split("\t", -3);
            String gender=str[3];
            context.write(new Text(gender), new Text(value));
         }
         catch(Exception e)
         {
            System.out.println(e.getMessage());
         }
      }
   }
   
   //Reducer class
       
   public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
   {
      public int max = -1;
      public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
      {
         max = -1;
                     
         for (Text val : values)
         {
            String [] str = val.toString().split("\t", -3);
            if(Integer.parseInt(str[4])>max)
            max=Integer.parseInt(str[4]);
         }
                     
         context.write(new Text(key), new IntWritable(max));
      }
   }
   
   //Partitioner class
       
   public static class CaderPartitioner extends
   Partitioner < Text, Text >
   {
      @Override
      public int getPartition(Text key, Text value, int numReduceTasks)
      {
         String[] str = value.toString().split("\t");
         int age = Integer.parseInt(str[2]);
         
         if(numReduceTasks == 0)
         {
            return 0;
         }
         
         if(age<=20)
         {
            return 0;
         }
         else if(age>20 && age<=30)
         {
            return 1 % numReduceTasks;
         }
         else
         {
            return 2 % numReduceTasks;
         }
      }
   }
   
   @Override
   public int run(String[] arg) throws Exception
   {
      Configuration conf = getConf();
              
      Job job = new Job(conf, "topsal");
      job.setJarByClass(PartitionerExample.class);
              
      FileInputFormat.setInputPaths(job, new Path(arg[0]));
      FileOutputFormat.setOutputPath(job,new Path(arg[1]));
              
      job.setMapperClass(MapClass.class);
              
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      //set partitioner statement
              
      job.setPartitionerClass(CaderPartitioner.class);
      job.setReducerClass(ReduceClass.class);
      job.setNumReduceTasks(3);
      job.setInputFormatClass(TextInputFormat.class);
              
      job.setOutputFormatClass(TextOutputFormat.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);
              
      System.exit(job.waitForCompletion(true)? 0 : 1);
      return 0;
   }
   
   public static void main(String ar[]) throws Exception
   {
      int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
      System.exit(0);
   }
}

Save the above code as PartitionerExample.java in “/home/hadoop/hadoopPartitioner”. The compilation and execution of the program are given below.

Compilation and Execution

Let us assume we are in the home directory of the Hadoop user (for example, /home/hadoop).

Follow the steps given below to compile and execute the above program.

Step 1 âˆ’ Download Hadoop-core-1.2.1.jar, which is used to compile and execute the MapReduce program. You can download the jar from mvnrepository.com.

Let us assume the downloaded folder is “/home/hadoop/hadoopPartitioner”

Step 2 âˆ’ The following commands are used for compiling the program PartitionerExample.java and creating a jar for the program.

$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .

Step 3 âˆ’ Use the following command to create an input directory in HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 4 âˆ’ Use the following command to copy the input file named input.txt in the input directory of HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir

Step 5 âˆ’ Use the following command to verify the files in the input directory.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 6 âˆ’ Use the following command to run the Top salary application by taking input files from the input directory.

$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir

Wait for a while till the file gets executed. After execution, the output contains a number of input splits, map tasks, and Reducer tasks.

15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49
 
File System Counters
 
   FILE: Number of bytes read=467
   FILE: Number of bytes written=426777
   FILE: Number of read operations=0
   FILE: Number of large read operations=0
   FILE: Number of write operations=0
       
   HDFS: Number of bytes read=480
   HDFS: Number of bytes written=72
   HDFS: Number of read operations=12
   HDFS: Number of large read operations=0
   HDFS: Number of write operations=6
       
Job Counters
 
   Launched map tasks=1
   Launched reduce tasks=3
       
   Data-local map tasks=1
       
   Total time spent by all maps in occupied slots (ms)=8212
   Total time spent by all reduces in occupied slots (ms)=59858
   Total time spent by all map tasks (ms)=8212
   Total time spent by all reduce tasks (ms)=59858
       
   Total vcore-seconds taken by all map tasks=8212
   Total vcore-seconds taken by all reduce tasks=59858
       
   Total megabyte-seconds taken by all map tasks=8409088
   Total megabyte-seconds taken by all reduce tasks=61294592
       
Map-Reduce Framework
 
   Map input records=13
   Map output records=13
   Map output bytes=423
   Map output materialized bytes=467
       
   Input split bytes=119
       
   Combine input records=0
   Combine output records=0
       
   Reduce input groups=6
   Reduce shuffle bytes=467
   Reduce input records=13
   Reduce output records=6
       
   Spilled Records=26
   Shuffled Maps =3
   Failed Shuffles=0
   Merged Map outputs=3
   GC time elapsed (ms)=224
   CPU time spent (ms)=3690
       
   Physical memory (bytes) snapshot=553816064
   Virtual memory (bytes) snapshot=3441266688
       
   Total committed heap usage (bytes)=334102528
       
Shuffle Errors
 
   BAD_ID=0
   CONNECTION=0
   IO_ERROR=0
       
   WRONG_LENGTH=0
   WRONG_MAP=0
   WRONG_REDUCE=0
       
File Input Format Counters
 
   Bytes Read=361
       
File Output Format Counters
 
   Bytes Written=72

Step 7 âˆ’ Use the following command to verify the resultant files in the output folder.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

You will find the output in three files because you are using three partitioners and three Reducers in your program.

Step 8 − Use the following command to see the output in the Part-00000 file. This file is generated by HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Output in Part-00000

Female   15000
Male     40000

Use the following command to see the output in the Part-00001 file.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001

Output in Part-00001

Female   35000
Male    31000

Use the following command to see the output in the Part-00002 file.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002

Output in Part-00002

Female  51000
Male   50000

This Post Has 2 Comments

Leave a Reply