Wednesday, January 6, 2010

TF-IDF in Hadoop Part 2 Word Counts For Docs

The TF-IDF algorithm can be implemented in different ways. The Cloudera Hadoop training defines different steps on the implementation of each of the steps through different Jobs. I decided to take the approach of persisting the intermediate data before the execution of the subsequent steps. This part documents the implementation of Job 2 as the second part of my experiments with Hadoop.

Part 1 resulted in the word frequency for each of the documents in the input path provided, persisted at the "1-word-freq" output directory, as shown below:

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -cat 1-word-freq/part-r-00000 | less
...
therefore@all-shakespeare 652
therefore@leornardo-davinci-all.txt 124
therefore@the-outline-of-science-vol1.txt 36

The definition of Job 2 will take into account the structure of this data in the creation of the Mapper and Reducer classes.

Job 2: Word Counts for Docs

The goal of this job is to count the total number of words for each document, in a way to compare each word with the total number of words. I've tried to implement a default InputFormat and I couldn't find examples related to it. As I understood, the values could be read in the same format they are saved (Text, IntWritable), but I will keep it simple and use the same default InputFormat as before. Following the same definition as in part one, the specification of the Map and Reduce are as follows:
  • Map:
    • Input: ((word@document), n)
    • Re-arrange the mapper to have the key based on each document
    • Output: (document, word=n)
  • Reducer
    • N = totalWordsInDoc = sum [word=n]) for each document
    • Output: ((word@document), (n/N))
Note that the format used for the input of the mapper is the output for the previous job. The delimiters "@" and "/" were randomly picked to better represent the intent of the data. So, feel free to pick anything you prefer. The reducer just need to sum the total number of values in a document and pass this value over to the next step, along with the previous number of values, as necessary data for the next step.

I have learned that the Iterable values in the values of the Reducer class can't be iterated more than once. The loop just did not enter when two foreach operations were performed, so I implemented it using a temporary map.

Job2, Mapper

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)

package index;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
* LineIndexMapper Maps each observed word in a line to a (filename@offset) string.
*/
public class WordCountsForDocsMapper extends Mapper {
public WordCountsForDocsMapper() {
}
/**
* @param key is the byte offset of the current line in the file;
* @param value is the line from the file
* @param context
*
* PRE-CONDITION: aa@leornardo-davinci-all.txt 1
* aaron@all-shakespeare 98
* ab@leornardo-davinci-all.txt 3
*
* POST-CONDITION: Output <"all-shakespeare", "aaron=98"> pairs
*/
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] wordAndDocCounter = value.toString().split("\t");
String[] wordAndDoc = wordAndDocCounter[0].split("@");
context.write(new Text(wordAndDoc[1]), new Text(wordAndDoc[0] + "=" + wordAndDocCounter[1]));
}
}

Job2, Mapper Unit Test

I have just simplified the unit test to verify if the test Mapper generates the format needed for the Reducer.

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)
package index;

import static org.apache.hadoop.mrunit.testutil.ExtendedAssert.assertListEquals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import junit.framework.TestCase;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

/**
* Test cases for the word count mapper.
*/
public class WordCountsForDocsMapperTest extends TestCase {

private Mapper mapper;
private MapDriver driver;

@Before
public void setUp() {
mapper = new WordCountsForDocsMapper();
driver = new MapDriver(mapper);
}

@Test
public void testMultiWords() {
List> out = null;

try {
out = driver.withInput(new LongWritable(0), new Text("crazy@all-shakespeare\t25")).run();
} catch (IOException ioe) {
fail();
}

List> expected = new ArrayList>();
expected.add(new Pair(new Text("all-shakespeare"), new Text("crazy=25")));
assertListEquals(expected, out);
}
}

Job 2, Reducer

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)

package index;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
* WordCountsForDocsReducer counts the number of documents in the
*/
public class WordCountsForDocsReducer extends Reducer {

public WordCountsForDocsReducer() {
}

/**
* @param key is the key of the mapper
* @param values are all the values aggregated during the mapping phase
* @param context contains the context of the job run
*
* PRE-CONDITION: receive a list of
* pairs <"a.txt", ["word1=3", "word2=5", "word3=5"]>
*
* POST-CONDITION: <"word1@a.txt, 3/13">,
* <"word2@a.txt, 5/13">
*/
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int sumOfWordsInDocument = 0;
Map tempCounter = new HashMap();
for (Text val : values) {
String[] wordCounter = val.toString().split("=");
tempCounter.put(wordCounter[0], Integer.valueOf(wordCounter[1]));
sumOfWordsInDocument += Integer.parseInt(val.toString().split("=")[1]);
}
for (String wordKey : tempCounter.keySet()) {
context.write(new Text(wordKey + "@" + key.toString()), new Text(tempCounter.get(wordKey) + "/"
+ sumOfWordsInDocument));
}
}
}

Job 2, Reducer Unit Test

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)

package index;

import static org.apache.hadoop.mrunit.testutil.ExtendedAssert.assertListEquals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import junit.framework.TestCase;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

/**
* Test cases for the reducer of the word counts.
*/
public class WordCountsForDocsReducerTest extends TestCase {

private Reducer reducer;
private ReduceDriver driver;

@Before
public void setUp() {
reducer = new WordCountsForDocsReducer();
driver = new ReduceDriver(reducer);
}

@Test
public void testMultiWords() {
List> out = null;

try {
List values = new ArrayList();
values.add(new Text("car=50"));
values.add(new Text("hadoop=15"));
values.add(new Text("algorithms=25"));
out = driver.withInput(new Text("document"), values).run();
} catch (IOException ioe) {
fail();
}

List> expected = new ArrayList>();
expected.add(new Pair(new Text("car@document"), new Text("50/90")));
expected.add(new Pair(new Text("hadoop@document"), new Text("15/90")));
expected.add(new Pair(new Text("algorithms@document"), new Text("25/90")));
assertListEquals(expected, out);
}

}

Once again, following our Test-Driven Development approach, let's test our Mapper and Reducer classes in order to verify its "correctness" of the generated data. The JUnit 4 Test suit is updated as follows:

// (c) Copyright 2009 Cloudera, Inc.
// Updated by Marcello de Sales (marcello.dsales@gmail.com)
package index;

import junit.framework.Test;
import junit.framework.TestSuite;

/**
* All tests for inverted index code
*
* @author aaron
*/
public final class AllTests {

private AllTests() { }

public static Test suite() {
TestSuite suite = new TestSuite("Tests for the TF-IDF algorithm");

suite.addTestSuite(WordFreqMapperTest.class);
suite.addTestSuite(WordFreqReducerTest.class);
suite.addTestSuite(WordCountsForDocsMapperTest.class);
suite.addTestSuite(WordCountsForDocsReducerTest.class);

return suite;
}

}

Just testing it with the ANT task test, defined in the build.xml artifact.

training@training-vm:~/git/exercises/shakespeare$ ant test
Buildfile: build.xml

compile:
[javac] Compiling 12 source files to /home/training/git/exercises/shakespeare/bin

test:
[junit] Running index.AllTests
[junit] Testsuite: index.AllTests
[junit] Tests run: 7, Failures: 0, Errors: 0, Time elapsed: 0.424 sec
[junit] Tests run: 7, Failures: 0, Errors: 0, Time elapsed: 0.424 sec
[junit]

BUILD SUCCESSFUL

Similar to the previous Part 1, the the execution of the Driver is safer to proceed with tested classes. Furthermore, it includes the definitions of the mapper and reducer classes, as well as defining the combiner class to be the same as the reducer class. Also, note that the definition of the outputKeyClass and outputValueClass are the same as the ones defined by the Reducer class!!! Once again, Hadoop complains whey they are different :)

Job2, Driver

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)
package index;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* WordCountsInDocuments counts the total number of words in each document and
* produces data with the relative and total number of words for each document.
*/
public class WordCountsInDocuments extends Configured implements Tool {

// where to put the data in hdfs when we're done
private static final String OUTPUT_PATH = "2-word-counts";

// where to read the data from.
private static final String INPUT_PATH = "1-word-freq";

public int run(String[] args) throws Exception {

Configuration conf = getConf();
Job job = new Job(conf, "Words Counts");

job.setJarByClass(WordCountsInDocuments.class);
job.setMapperClass(WordCountsForDocsMapper.class);
job.setReducerClass(WordCountsForDocsReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new WordCountsInDocuments(), args);
System.exit(res);
}
}

The input data is located in the directory of the first step
"1-word-freq"
, and the output persisted in the directory "2-word-counts" as listed in the main training directory in the HDFS. If you need to take a look at the ANT build and other classes, go to my personal resources at my Google Code project. Recompile the project and generate the updated Jar with the driver.

training@training-vm:~/git/exercises/shakespeare$ ant
Buildfile: build.xml

compile:
[javac] Compiling 5 source files to /home/training/git/exercises/shakespeare/bin
[javac] Note: Some input files use or override a deprecated API.
[javac] Note: Recompile with -Xlint:deprecation for details.


jar:
[jar] Building jar: /home/training/git/exercises/shakespeare/indexer.jar


BUILD SUCCESSFUL
Total time: 1 second

Now, executing the driver...

training@training-vm:~/git/exercises/shakespeare$ hadoop jar indexer.jar index.WordCountsInDocuments
10/01/06 16:28:04 INFO input.FileInputFormat: Total input paths to process : 1
10/01/06 16:28:04 INFO mapred.JobClient: Running job: job_200912301017_0048
10/01/06 16:28:05 INFO mapred.JobClient: map 0% reduce 0%
10/01/06 16:28:12 INFO mapred.JobClient: map 100% reduce 0%
10/01/06 16:28:18 INFO mapred.JobClient: map 100% reduce 100%
10/01/06 16:28:20 INFO mapred.JobClient: Job complete: job_200912301017_0048
10/01/06 16:28:20 INFO mapred.JobClient: Counters: 17
10/01/06 16:28:20 INFO mapred.JobClient: Job Counters
10/01/06 16:28:20 INFO mapred.JobClient: Launched reduce tasks=1
10/01/06 16:28:20 INFO mapred.JobClient: Launched map tasks=1
10/01/06 16:28:20 INFO mapred.JobClient: Data-local map tasks=1
10/01/06 16:28:20 INFO mapred.JobClient: FileSystemCounters
10/01/06 16:28:20 INFO mapred.JobClient: FILE_BYTES_READ=1685803
10/01/06 16:28:20 INFO mapred.JobClient: HDFS_BYTES_READ=1588239
10/01/06 16:28:20 INFO mapred.JobClient: FILE_BYTES_WRITTEN=3371638
10/01/06 16:28:20 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=1920431
10/01/06 16:28:20 INFO mapred.JobClient: Map-Reduce Framework
10/01/06 16:28:20 INFO mapred.JobClient: Reduce input groups=0
10/01/06 16:28:20 INFO mapred.JobClient: Combine output records=0
10/01/06 16:28:20 INFO mapred.JobClient: Map input records=48779
10/01/06 16:28:20 INFO mapred.JobClient: Reduce shuffle bytes=1685803
10/01/06 16:28:20 INFO mapred.JobClient: Reduce output records=0
10/01/06 16:28:20 INFO mapred.JobClient: Spilled Records=97558
10/01/06 16:28:20 INFO mapred.JobClient: Map output bytes=1588239
10/01/06 16:28:20 INFO mapred.JobClient: Combine input records=0
10/01/06 16:28:20 INFO mapred.JobClient: Map output records=48779
10/01/06 16:28:20 INFO mapred.JobClient: Reduce input records=48779

Note that the execution generates tens of thousands of documents shuffled from ~1.6 million entries. Let's check the result using the hadoop fs -cat command once again and navigate through the result. The most important thing to note is that the relation n/N are maintained throughout the results, for each word and each total number for each document.

training@training-vm:~/git/exercises/shakespeare$
hadoop fs -cat 2-word-counts/part-r-00000 | less
....
relished@all-shakespeare 1/738781
therefore@all-shakespeare 652/738781
eastward@all-shakespeare 1/738781
....
irrespective@leornardo-davinci-all.txt 1/149612
ignorance@leornardo-davinci-all.txt 12/149612
drawing@leornardo-davinci-all.txt 174/149612
relief@leornardo-davinci-all.txt 36/149612
...
answer@the-outline-of-science-vol1.txt 25/70650
sleeve@the-outline-of-science-vol1.txt 1/70650
regard@the-outline-of-science-vol1.txt 22/70650

Part 3 will conclude this job by combining two different steps. I'm still using the original basic tutorial from Cloudera, but using the Hadoop 0.20.1 API. Any suggestions for improvements are welcomed:

- How to write data pipes between 2 different jobs?
- How to write a custom input format?

Those questions might be answered after the training in Sunnyvale on January 19-21, during the Hadoop Training I'm excited to attend.

Tuesday, January 5, 2010

TF-IDF in Hadoop Part 1: Word Frequency in Doc

My interest about parallel computing dates since my undergraduate school, just one or two years after Google's paper was published about how to make efficient data processing. From that time on, I was wondering how they manage to index "the web".

This code uses the Hadoop 0.20.1 API.

7 years passed and while writing my thesis project, I started dealing with the same questions regarding large datasets... How to process them on a database level? I mean, how to efficiently process with the computational resources you've got? mongoDB gave me at least parallel data processing with their database partitioning schame with database shards in a cluster. If the data is stored in different shards depending on different properties of the data. And of course, one of the tools to process the distributed data is a MapReduce API. The problem is that mongoDB is on its early stages, with a few people managing to have on their production environment, while the community members support among themselves through the user or developer's list. The documentation on MapReduce is very simple, but it definitely required some background...

I finally found the Cloudera basic introduction training on MapReduce and Hadoop... and let me tell you, they made the nicest introduction to MapReduce I've seen :) The slides and documentation is very well structured and nice to follow... They actually worked at Google and the University of Washington to get to that level... I'm was very pleased to read, and understand the concept... My only need on that time was to use that knowledge on the MapReduce engine from mongoDB... I did a simple application and it proved to be interesting...

So, I've been studying the Cloudera basic training in Hadoop, and that was the only way I could learn MapReduce! That is my suggestion to anyone with the same desire I had: learn the programming model behind the scene... The first implementation I did with Hadoop was the implementation of the indexing of words on All the Shakespeare collection, as well as Da-Vinci and a science book downloaded from the Gutenberg project. The input directory includes the collection of all sharkespeare books in a single text file. You can add the downloaded files to the Hadoop File System by using the copyFromLocal command:

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -copyFromLocal the-outline-of-science-vol1.txt input
training@training-vm:~/git/exercises/shakespeare$ hadoop fs -copyFromLocal leornardo-davinci-all.txt input

You can verify if the files were added by listing the contents of the "input" directory.


training@training-vm:~/git/exercises/shakespeare$ hadoop fs -ls input
Found 3 items
-rw-r--r-- 1 training supergroup 5342761 2009-12-30 11:57 /user/training/input/all-shakespeare
-rw-r--r-- 1 training supergroup 1427769 2010-01-04 17:42 /user/training/input/leornardo-davinci-all.txt
-rw-r--r-- 1 training supergroup 674762 2010-01-04 17:42 /user/training/input/the-outline-of-science-vol1.txt





As I started learning the API and the HDFS, as well as exploring the implementation of the TF-IDF algorithm, as explained by the Cloudera training. I started this implementation after I implemented the InvertedIndex example using both the Hadoop 0.18 and the 0.20.1 APIs. The parts of my experiences are defined as follows:

Part 1: implements the "Job 1: Word Frequency in Doc". This is documented in this post;
Part 3: "Job 3: Word Frequency In Corpus and Job 4: Calculate TF-IDF";

Following the suggestion of the documentation, the approach I took to easily understand the concepts was to device-to-conquer. Each of the jobs are executed in separate as an exercise, saving the generated reduced values into the HDFS.

Job 1: Word Frequency in Doc

As mentioned before, the word frequency phase is designed in a Job whose task is to count the number of words in each of the documents in the input directory. In this case, the specification of the Map and Reduce are as follows:
  • Map:
    • Input: (document, each line contents)
    • Output: (word@document, 1))
  • Reducer
    • n = sum of the values of for each key "word@document"
    • Output: ((word@document), n)
In order to decrease the payload received by reducers, I'm considering the very-high-frequency words such as "the" as the Google's stopwords list. Also, the result of each job is the intermediate values for the next jobs are saved to the regular file, followed by the next MapReduce pass. In general, the strategy is:

  1. Reduces the map phase by using the lower-case values, because they will be aggregated before the reduce phase;
  2. Don't use unnecessary words by verifying in the stopwords dictionary (Google search stopwords);
  3. Use RegEx to select only words, removing punctuation and other data anomalies;
Job1, Mapper

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)
package index;


import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;


/**
* WordFrequenceInDocMapper implements the Job 1 specification for the TF-IDF algorithm
*/
public class WordFrequenceInDocMapper extends Mapper {


public WordFrequenceInDocMapper() {
}


/**
* Google's search Stopwords
*/
private static Set googleStopwords;


static {
googleStopwords = new HashSet();
googleStopwords.add("I"); googleStopwords.add("a"); googleStopwords.add("about");
googleStopwords.add("an"); googleStopwords.add("are"); googleStopwords.add("as");
googleStopwords.add("at"); googleStopwords.add("be"); googleStopwords.add("by");
googleStopwords.add("com"); googleStopwords.add("de"); googleStopwords.add("en");
googleStopwords.add("for"); googleStopwords.add("from"); googleStopwords.add("how");
googleStopwords.add("in"); googleStopwords.add("is"); googleStopwords.add("it");
googleStopwords.add("la"); googleStopwords.add("of"); googleStopwords.add("on");
googleStopwords.add("or"); googleStopwords.add("that"); googleStopwords.add("the");
googleStopwords.add("this"); googleStopwords.add("to"); googleStopwords.add("was");
googleStopwords.add("what"); googleStopwords.add("when"); googleStopwords.add("where");
googleStopwords.add("who"); googleStopwords.add("will"); googleStopwords.add("with");
googleStopwords.add("and"); googleStopwords.add("the"); googleStopwords.add("www");
}

/**
* @param key is the byte offset of the current line in the file;
* @param value is the line from the file
* @param output has the method "collect()" to output the key,value pair
* @param reporter allows us to retrieve some information about the job (like the current filename)
*
* POST-CONDITION: Output <"word@filename", 1> pairs
*/
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// Compile all the words using regex
Pattern p = Pattern.compile("\\w+");
Matcher m = p.matcher(value.toString());


// Get the name of the file from the inputsplit in the context
String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();


// build the values and write pairs through the context
StringBuilder valueBuilder = new StringBuilder();
while (m.find()) {
String matchedKey = m.group().toLowerCase();
// remove names starting with non letters, digits, considered stopwords or containing other chars
if (!Character.isLetter(matchedKey.charAt(0)) || Character.isDigit(matchedKey.charAt(0))
|| googleStopwords.contains(matchedKey) || matchedKey.contains("_")) {
continue;
}
valueBuilder.append(matchedKey);
valueBuilder.append("@");
valueBuilder.append(fileName);
// emit the partial
context.write(new Text(valueBuilder.toString()), new IntWritable(1));
valueBuilder.setLength(0);
}
}
}

Job1, Mapper Unit Test



Note that the unit tests use the JUnit 4 API. The MRUnit API is also updated to use the Hadoop 0.20.1 API for the Mapper and the respective MapDriver. Generics are used to emulate the actual implementation as well.






// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)
package index;

import static org.apache.hadoop.mrunit.testutil.ExtendedAssert.assertListEquals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import junit.framework.TestCase;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mock.MockInputSplit;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

/**
* Test cases for the word frequency mapper.
*/
public class WordFreqMapperTest extends TestCase {

private Mapper mapper;
private MapDriver driver;

/** We expect pathname@offset for the key from each of these */
private final Text KEY_SUFIX = new Text("@" + MockInputSplit.getMockPath().toString());

@Before
public void setUp() {
mapper = new WordFrequenceInDocMapper();
driver = new MapDriver(mapper);
}

@Test
public void testEmpty() {
List> out = null;

try {
out = driver.withInput(new LongWritable(0), new Text("")).run();
} catch (IOException ioe) {
fail();
}

List> expected = new ArrayList>();

assertListEquals(expected, out);
}

@Test
public void testOneWord() {
List> out = null;

try {
out = driver.withInput(new LongWritable(0), new Text("foo")).run();
} catch (IOException ioe) {
fail();
}

List> expected = new ArrayList>();
expected.add(new Pair(new Text("foo" + KEY_SUFIX), new IntWritable(1)));

assertListEquals(expected, out);
}

@Test
public void testMultiWords() {
List> out = null;

try {
out = driver.withInput(new LongWritable(0), new Text("foo bar baz!!!! ????")).run();
} catch (IOException ioe) {
fail();
}

List> expected = new ArrayList>();
expected.add(new Pair(new Text("foo" + KEY_SUFIX), new IntWritable(1)));
expected.add(new Pair(new Text("bar" + KEY_SUFIX), new IntWritable(1)));
expected.add(new Pair(new Text("baz" + KEY_SUFIX), new IntWritable(1)));

assertListEquals(expected, out);
}
}


Job1, Reducer


// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)


package index;


import java.io.IOException;


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


/**
* WordFrequenceInDocReducer.
*/
public class WordFrequenceInDocReducer extends Reducer {


public WordFrequenceInDocReducer() {
}


/**
* @param key is the key of the mapper
* @param values are all the values aggregated during the mapping phase
* @param context contains the context of the job run
*
* PRE-CONDITION: receive a list of <"word@filename",[1, 1, 1, ...]> pairs
* <"marcello@a.txt", [1, 1]>
*
* POST-CONDITION: emit the output a single key-value where the sum of the occurrences.
* <"marcello@a.txt", 2>
*/
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {


int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
//write the key and the adjusted value (removing the last comma)
context.write(key, new IntWritable(sum));
}
}

Job1, Reducer Unit Test






// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)

package index;

import static org.apache.hadoop.mrunit.testutil.ExtendedAssert.assertListEquals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import junit.framework.TestCase;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

/**
* Test cases for the inverted index reducer.
*/
public class WordFreqReducerTest extends TestCase {

private Reducer reducer;
private ReduceDriver driver;

@Before
public void setUp() {
reducer = new WordFrequenceInDocReducer();
driver = new ReduceDriver(reducer);
}

@Test
public void testOneItem() {
List> out = null;

try {
out = driver.withInputKey(new Text("word")).withInputValue(new IntWritable(1)).run();
} catch (IOException ioe) {
fail();
}

List> expected = new ArrayList>();
expected.add(new Pair(new Text("word"), new IntWritable(1)));

assertListEquals(expected, out);
}

@Test
public void testMultiWords() {
List> out = null;

try {
List values = new ArrayList();
values.add(new IntWritable(2));
values.add(new IntWritable(5));
values.add(new IntWritable(8));
out = driver.withInput(new Text("word1"), values).run();

} catch (IOException ioe) {
fail();
}

List> expected = new ArrayList>();
expected.add(new Pair(new Text("word1"), new IntWritable(15)));

assertListEquals(expected, out);
}

}




Before executing the hadoop application, make sure that the Mapper and Reducer classes are passing the unit tests for each of them. Test-Driven Development helps during the development of the Mappers and Reducers by identifying problems related to incorrect inherited methods (Generics in special), where wrong "map" or "reduce" method signatures may lead to skipping designed phases. Therefore, run the test cases before the actual execution of the driver classes is safer.

training@training-vm:~/git/exercises/shakespeare$ ant test
Buildfile: build.xml


compile:
[javac] Compiling 5 source files to /home/training/git/exercises/shakespeare/bin
[javac] Note: Some input files use or override a deprecated API.
[javac] Note: Recompile with -Xlint:deprecation for details.


test:
[junit] Running index.AllTests
[junit] Testsuite: index.AllTests
[junit] Tests run: 2, Failures: 0, Errors: 0, Time elapsed: 0.279 sec
[junit] Tests run: 2, Failures: 0, Errors: 0, Time elapsed: 0.279 sec
[junit]


BUILD SUCCESSFUL
Total time: 2 seconds

Then, the execution of the Driver can proceed. It includes the definitions of the mapper and reducer classes, as well as defining the combiner class to be the same as the reducer class. Also, note that the definition of the outputKeyClass and outputValueClass are the same as the ones defined by the Reducer class!!! If not, Hadoop will complain! :)


Job1, Driver

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)
package index;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


/**
* WordFrequenceInDocument Creates the index of the words in documents,
* mapping each of them to their frequency.
*/
public class WordFrequenceInDocument extends Configured implements Tool {


// where to put the data in hdfs when we're done
private static final String OUTPUT_PATH = "1-word-freq";


// where to read the data from.
private static final String INPUT_PATH = "input";


public int run(String[] args) throws Exception {


Configuration conf = getConf();
Job job = new Job(conf, "Word Frequence In Document");


job.setJarByClass(WordFrequenceInDocument.class);
job.setMapperClass(WordFrequenceInDocMapper.class);
job.setReducerClass(WordFrequenceInDocReducer.class);
job.setCombinerClass(WordFrequenceInDocReducer.class);


job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);


FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));


return job.waitForCompletion(true) ? 0 : 1;
}


public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new WordFrequenceInDocument(), args);
System.exit(res);
}
}

As specified by the Driver class, the data is read from the books listed in the input directory from the HDFS and the output is the directory from this first step "1-word-freq". The training virtual machine contains the necessary build scripts to compile and generate the jars for the execution of the map reduce application, as well as running Unit Tests for each of the Mapper and Reducer classes.

training@training-vm:~/git/exercises/shakespeare$ ant
Buildfile: build.xml


compile:
[javac] Compiling 5 source files to /home/training/git/exercises/shakespeare/bin
[javac] Note: Some input files use or override a deprecated API.
[javac] Note: Recompile with -Xlint:deprecation for details.


jar:
[jar] Building jar: /home/training/git/exercises/shakespeare/indexer.jar


BUILD SUCCESSFUL
Total time: 1 second

Now, executing the driver...


training@training-vm:~/git/exercises/shakespeare$ hadoop jar indexer.jar index.WordFrequenceInDocument

hadoop jar indexer.jar index.WordFrequenceInDocument
10/01/05 16:34:54 INFO input.FileInputFormat: Total input paths to process : 3
10/01/05 16:34:54 INFO mapred.JobClient: Running job: job_200912301017_0046
10/01/05 16:34:55 INFO mapred.JobClient: map 0% reduce 0%
10/01/05 16:35:10 INFO mapred.JobClient: map 50% reduce 0%
10/01/05 16:35:13 INFO mapred.JobClient: map 66% reduce 0%
10/01/05 16:35:16 INFO mapred.JobClient: map 100% reduce 0%
10/01/05 16:35:19 INFO mapred.JobClient: map 100% reduce 33%
10/01/05 16:35:25 INFO mapred.JobClient: map 100% reduce 100%
10/01/05 16:35:27 INFO mapred.JobClient: Job complete: job_200912301017_0046
10/01/05 16:35:27 INFO mapred.JobClient: Counters: 17
10/01/05 16:35:27 INFO mapred.JobClient: Job Counters
10/01/05 16:35:27 INFO mapred.JobClient: Launched reduce tasks=1
10/01/05 16:35:27 INFO mapred.JobClient: Launched map tasks=3
10/01/05 16:35:27 INFO mapred.JobClient: Data-local map tasks=3
10/01/05 16:35:27 INFO mapred.JobClient: FileSystemCounters
10/01/05 16:35:27 INFO mapred.JobClient: FILE_BYTES_READ=3129067
10/01/05 16:35:27 INFO mapred.JobClient: HDFS_BYTES_READ=7445292
10/01/05 16:35:27 INFO mapred.JobClient: FILE_BYTES_WRITTEN=4901739
10/01/05 16:35:27 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=1588239
10/01/05 16:35:27 INFO mapred.JobClient: Map-Reduce Framework
10/01/05 16:35:27 INFO mapred.JobClient: Reduce input groups=0
10/01/05 16:35:27 INFO mapred.JobClient: Combine output records=94108
10/01/05 16:35:27 INFO mapred.JobClient: Map input records=220255
10/01/05 16:35:27 INFO mapred.JobClient: Reduce shuffle bytes=1772576
10/01/05 16:35:27 INFO mapred.JobClient: Reduce output records=0
10/01/05 16:35:27 INFO mapred.JobClient: Spilled Records=142887
10/01/05 16:35:27 INFO mapred.JobClient: Map output bytes=27375962
10/01/05 16:35:27 INFO mapred.JobClient: Combine input records=1004372
10/01/05 16:35:27 INFO mapred.JobClient: Map output records=959043
10/01/05 16:35:27 INFO mapred.JobClient: Reduce input records=48779


The execution generates the output as shown in the following listing (note that I had piped the cat process to the less process for you to navigate over the stream). Searching for the word "therefore" shows its use on the different documents.

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -cat 1-word-freq/part-r-00000 | less

...
therefore@all-shakespeare 652
therefore@leornardo-davinci-all.txt 124
therefore@the-outline-of-science-vol1.txt 36

The results produced are the intermediate data necessary for the execution of the Job 2, specified in the Part 2 of this tutorial, where it counts the total number of words encountered using the data in the 1-word-freq directory.

StartupCTO - Helping Small Teams Develop Great Software