Tuesday, January 5, 2010

Hadoop 0.20.1 API: refactoring the InvertedLine example, removing deprecated classes (JobConf, others)

I've been learning Hadoop for the past 15 days and I have found lots of examples of source-code. The basic training offered by Cloudera uses the 0.18 API, as well as the Yahoo developer's tutorial that describe the example of a the Inverted Line Index example. The input of this example is a list of one or more text files containing books, and the output is the index of words appearing on each of the files in the format "", where word is found on a given line of the given fileName at the byte offset given. Although the example works without a problem, I've read documentations about the Pig application where the majority of the warnings are caused by the API change. I'm particularly in favour of clean code without warnings, whenever possible. So, I started dissecting the API and could re-implement the examples using the Hadoop 0.20.1. Furthermore, the MRUnit must also be refactored in order to make use of the new API.

Both the Yahoo Hadoop Tutorial and the Cloudera Basic Training documentation "Writing MapReduce Programs" give the example of the InvertedIndex application. I used the Cloudera VMWare implementation and source-code as a starting point.

The first major change was the inclusion of the mapreduce package, containing the new implementation of the Mapper and Reducer classes, which were Interfaces in the previous APIs in the package "mapred".

import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

public class MyMapper extends Mapper {}
public class MyReducer extends Reducer{}

Also, note that these classes use the Java generics capabilities and therefore, the methods "map()" and "reduce()" must follow the convention given in your implementation. Both methods removed the use of the reporter and collector by the use of a Context class, that is a static member class of each of the Mapper and Reducer classes.

protected void map(K key, V value, Mapper.Context context)
protected void reduce(K key, Iterable values, Context context)

Whatever the type of K and V are, they must be used in the implementation. For instance, I used to have an Iterator implementation for the key in the reducer, and the reduce method was never called with the wrong method signature. So, it is important to verify that you're using the Iterable class for the values instead.

Mapper Class

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)

package index;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
* LineIndexMapper Maps each observed word in a line to a (filename@offset) string.
*/
public class LineIndexMapper extends Mapper
{

public LineIndexMapper() {
}

/**
* Google's search Stopwords
*/
private static Set
googleStopwords;

static {
googleStopwords = new HashSet
();
googleStopwords.add("I"); googleStopwords.add("a"); googleStopwords.add("about");
googleStopwords.add("an"); googleStopwords.add("are"); googleStopwords.add("as");
googleStopwords.add("at"); googleStopwords.add("be"); googleStopwords.add("by");

googleStopwords.add("com"); googleStopwords.add("de"); googleStopwords.add("en");
googleStopwords.add("for"); googleStopwords.add("from"); googleStopwords.add("how");
googleStopwords.add("in"); googleStopwords.add("is"); googleStopwords.add("it");
googleStopwords.add("la"); googleStopwords.add("of"); googleStopwords.add("on");
googleStopwords.add("or"); googleStopwords.add("that"); googleStopwords.add("the");
googleStopwords.add("this"); googleStopwords.add("to"); googleStopwords.add("was");
googleStopwords.add("what"); googleStopwords.add("when"); googleStopwords.add("where");
googleStopwords.add("who"); googleStopwords.add("will"); googleStopwords.add("with");
googleStopwords.add("and"); googleStopwords.add("the"); googleStopwords.add("www");
}

/**
* @param key is the byte offset of the current line in the file;
* @param value is the line from the file
* @param output has the method "collect()" to output the key,value pair
* @param reporter allows us to retrieve some information about the job (like the current filename)
*
* POST-CONDITION: Output <"word", "filename@offset"> pairs
*/
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// Compile all the words using regex
Pattern p = Pattern.compile("\\w+");
Matcher m = p.matcher(value.toString());

// Get the name of the file from the inputsplit in the context
String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();

// build the values and write
pairs through the context
StringBuilder valueBuilder = new StringBuilder();
while (m.find()) {
String matchedKey = m.group().toLowerCase();
// remove names starting with non letters, digits, considered stopwords or containing other chars
if (!Character.isLetter(matchedKey.charAt(0)) || Character.isDigit(matchedKey.charAt(0))
|| googleStopwords.contains(matchedKey) || matchedKey.contains("_")) {
continue;
}
valueBuilder.append(fileName);
valueBuilder.append("@");
valueBuilder.append(key.get());
// emit the partial

context.write(new Text(matchedKey), new Text(valueBuilder.toString()));
valueBuilder.setLength(0);
}
}
}


Reducer Class

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)

package index;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
* LineIndexReducer Takes a list of filename@offset entries for a single word and concatenates them into a list.
*/
public class LineIndexReducer extends Reducer
{

public LineIndexReducer() {
}

/**
* @param key is the key of the mapper
* @param values are all the values aggregated during the mapping phase
* @param context contains the context of the job run
*
* PRE-CONDITION: receive a list of <"word", "filename@offset"> pairs
* <"marcello", ["a.txt@3345", "b.txt@344", "c.txt@785"]>
*
* POST-CONDITION: emit the output a single key-value where all the file names
* are separated by a comma ",".
* <"marcello", "a.txt@3345,b.txt@344,c.txt@785">
*/
protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException {
StringBuilder valueBuilder = new StringBuilder();

for (Text val : values) {
valueBuilder.append(val);
valueBuilder.append(",");
}
//write the key and the adjusted value (removing the last comma)
context.write(key, new Text(valueBuilder.substring(0, valueBuilder.length() - 1)));
valueBuilder.setLength(0);
}
}


These are the changes necessary for the Mapper and Reducer classes, without the need to extend the base classes. In order to unit test these classes, changes on the MRUnit are also necessary. The drivers were also added a new "mapreduce" package with the same counterparts.

Instead of the mrunit.MapDriver, use the mapreduce.MapDriver. The same for the Reducer class. The rest of the code is just the same.

import org.apache.hadoop.mrunit.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;

JUnit's MapperTest

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)
package index;

import static org.apache.hadoop.mrunit.testutil.ExtendedAssert.assertListEquals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import junit.framework.TestCase;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mock.MockInputSplit;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

/**
* Test cases for the inverted index mapper.
*/
public class MapperTest extends TestCase {

private Mapper
mapper;
private MapDriver
driver;

/** We expect pathname@offset for the key from each of these */
private final Text EXPECTED_OFFSET = new Text(MockInputSplit.getMockPath().toString() + "@0");

@Before
public void setUp() {
mapper = new LineIndexMapper();
driver = new MapDriver
(mapper);
}

@Test
public void testEmpty() {
List
> out = null;

try {
out = driver.withInput(new LongWritable(0), new Text("")).run();
} catch (IOException ioe) {
fail();
}

List
> expected = new ArrayList>();

assertListEquals(expected, out);
}

@Test
public void testOneWord() {
List
> out = null;

try {
out = driver.withInput(new LongWritable(0), new Text("foo")).run();
} catch (IOException ioe) {
fail();
}

List
> expected = new ArrayList>();
expected.add(new Pair
(new Text("foo"), EXPECTED_OFFSET));

assertListEquals(expected, out);
}

@Test
public void testMultiWords() {
List
> out = null;

try {
out = driver.withInput(new LongWritable(0), new Text("foo bar baz!!!! ????")).run();
} catch (IOException ioe) {
fail();
}

List
> expected = new ArrayList>();
expected.add(new Pair
(new Text("foo"), EXPECTED_OFFSET));
expected.add(new Pair
(new Text("bar"), EXPECTED_OFFSET));
expected.add(new Pair
(new Text("baz"), EXPECTED_OFFSET));

assertListEquals(expected, out);
}
}


JUnit's ReducerTest

// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)

package index;

import static org.apache.hadoop.mrunit.testutil.ExtendedAssert.assertListEquals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import junit.framework.TestCase;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;

/**
* Test cases for the inverted index reducer.
*/
public class ReducerTest extends TestCase {

private Reducer
reducer;
private ReduceDriver
driver;

@Before
public void setUp() {
reducer = new LineIndexReducer();
driver = new ReduceDriver
(reducer);
}

@Test
public void testOneOffset() {
List
> out = null;

try {
out = driver.withInputKey(new Text("word")).withInputValue(new Text("offset")).run();
} catch (IOException ioe) {
fail();
}

List
> expected = new ArrayList>();
expected.add(new Pair
(new Text("word"), new Text("offset")));

assertListEquals(expected, out);
}

@Test
public void testMultiOffset() {
List
> out = null;

try {
out = driver.withInputKey(new Text("word")).withInputValue(new Text("offset1")).withInputValue(
new Text("offset2")).run();
} catch (IOException ioe) {
fail();
}

List
> expected = new ArrayList>();
expected.add(new Pair
(new Text("word"), new Text("offset1,offset2")));

assertListEquals(expected, out);
}

}


You can test them using the command "ant test" on the source-code directory as usual to confirm that the implementation is correct:

training@training-vm:~/git/exercises/shakespeare$ ant test
Buildfile: build.xml

compile:
[javac] Compiling 4 source files to /home/training/git/exercises/shakespeare/bin

test:
[junit] Running index.AllTests
[junit] Testsuite: index.AllTests
[junit] Tests run: 5, Failures: 0, Errors: 0, Time elapsed: 0.418 sec
[junit] Tests run: 5, Failures: 0, Errors: 0, Time elapsed: 0.418 sec
[junit]

BUILD SUCCESSFUL
Total time: 2 seconds



Replacing JobConf and other deprecated classes

Other changes related to the API is on the configuration of the execution of the jobs. The class "JobConf" was deprecated, but most of the tutorials have not been updated. So, here's the updated version of the main example driver using the Configuration and Context classes. Note that the job is configured and executed with the default version of the configuration. It is the class responsible for configuring the execution of the tasks. Once again, the replacement of the classes located at the package "mapred" is important, since the new classes are located at the package "mapreduce".

InvertedIndex driver


// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)
package index;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* LineIndexer Creates an inverted index over all the words in a document corpus, mapping each observed word to a list
* of filename@offset locations where it occurs.
*/
public class LineIndexer extends Configured implements Tool {

// where to put the data in hdfs when we're done
private static final String OUTPUT_PATH = "output";

// where to read the data from.
private static final String INPUT_PATH = "input";




public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new LineIndexer(), args);
System.exit(res);
}


public int run(String[] args) throws Exception {

Configuration conf = getConf();
Job job = new Job(conf, "Line Indexer 1");

job.setJarByClass(LineIndexer.class);
job.setMapperClass(LineIndexMapper.class);
job.setReducerClass(LineIndexReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

return job.waitForCompletion(true) ? 0 : 1;
}
}


After updating, make sure to run generate a new jar, remove anything under the directory "output" (since the program does not clean that up), and execute the new version.

training@training-vm:~/git/exercises/shakespeare$ ant jar
Buildfile: build.xml

compile:
[javac] Compiling 4 source files to /home/training/git/exercises/shakespeare/bin

jar:
[jar] Building jar: /home/training/git/exercises/shakespeare/indexer.jar

BUILD SUCCESSFUL
Total time: 1 second


I have added 2 ASCII books in the input directory: the works from Leonardo Da Vinci and the first volume of the book "The outline of science".

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -ls input
Found 3 items
-rw-r--r-- 1 training supergroup 5342761 2009-12-30 11:57 /user/training/input/all-shakespeare
-rw-r--r-- 1 training supergroup 1427769 2010-01-04 17:42 /user/training/input/leornardo-davinci-all.txt
-rw-r--r-- 1 training supergroup 674762 2010-01-04 17:42 /user/training/input/the-outline-of-science-vol1.txt


The execution and output of running this example is shown as follows.

training@training-vm:~/git/exercises/shakespeare$ hadoop jar indexer.jar index.LineIndexer
10/01/04 21:11:55 INFO input.FileInputFormat: Total input paths to process : 3
10/01/04 21:11:56 INFO mapred.JobClient: Running job: job_200912301017_0017
10/01/04 21:11:57 INFO mapred.JobClient: map 0% reduce 0%
10/01/04 21:12:07 INFO mapred.JobClient: map 33% reduce 0%
10/01/04 21:12:10 INFO mapred.JobClient: map 58% reduce 0%
10/01/04 21:12:13 INFO mapred.JobClient: map 63% reduce 0%
10/01/04 21:12:16 INFO mapred.JobClient: map 100% reduce 11%
10/01/04 21:12:28 INFO mapred.JobClient: map 100% reduce 77%
10/01/04 21:12:34 INFO mapred.JobClient: map 100% reduce 100%
10/01/04 21:12:36 INFO mapred.JobClient: Job complete: job_200912301017_0017
10/01/04 21:12:36 INFO mapred.JobClient: Counters: 17
10/01/04 21:12:36 INFO mapred.JobClient: Job Counters
10/01/04 21:12:36 INFO mapred.JobClient: Launched reduce tasks=1
10/01/04 21:12:36 INFO mapred.JobClient: Launched map tasks=3
10/01/04 21:12:36 INFO mapred.JobClient: Data-local map tasks=3
10/01/04 21:12:36 INFO mapred.JobClient: FileSystemCounters
10/01/04 21:12:36 INFO mapred.JobClient: FILE_BYTES_READ=58068623
10/01/04 21:12:36 INFO mapred.JobClient: HDFS_BYTES_READ=7445292
10/01/04 21:12:36 INFO mapred.JobClient: FILE_BYTES_WRITTEN=92132872
10/01/04 21:12:36 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=26638259
10/01/04 21:12:36 INFO mapred.JobClient: Map-Reduce Framework
10/01/04 21:12:36 INFO mapred.JobClient: Reduce input groups=0
10/01/04 21:12:36 INFO mapred.JobClient: Combine output records=0
10/01/04 21:12:36 INFO mapred.JobClient: Map input records=220255
10/01/04 21:12:36 INFO mapred.JobClient: Reduce shuffle bytes=34064153
10/01/04 21:12:36 INFO mapred.JobClient: Reduce output records=0
10/01/04 21:12:36 INFO mapred.JobClient: Spilled Records=2762272
10/01/04 21:12:36 INFO mapred.JobClient: Map output bytes=32068217
10/01/04 21:12:36 INFO mapred.JobClient: Combine input records=0
10/01/04 21:12:36 INFO mapred.JobClient: Map output records=997959
10/01/04 21:12:36 INFO mapred.JobClient: Reduce input records=997959


The index entry for the word "abandoned" is an example of one present in all of the books:

training@training-vm:~/git/exercises/shakespeare$ hadoop fs -cat output/part-r-00000 | less
...
...

abandoned leornardo-davinci-all.txt@1257995,leornardo-davinci-all.txt@652992,all-shakespeare@4657862,all-shakespeare@738818,the-outline-of-science-vol1.txt@642211,the-outline-of-science-vol1.txt@606442,the-outline-of-science-vol1.txt@641585
...
...

6 comments:

Unknown said...

I've been trying to get your code to work with hadoop 0.20.2 and I'm seeing this error:

bash-3.2$ hadoop jar /code/hadoop/indexer/LineIndexer.jar LineIndexer
10/05/24 00:55:19 INFO input.FileInputFormat: Total input paths to process : 1
10/05/24 00:55:19 INFO mapred.JobClient: Running job: job_201005181230_0054
10/05/24 00:55:20 INFO mapred.JobClient: map 0% reduce 0%
10/05/24 00:55:29 INFO mapred.JobClient: Task Id : attempt_201005181230_0054_m_000000_0, Status : FAILED
java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:845)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:541)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
at org.apache.hadoop.mapred.Child.main(Child.java:170)


Any thoughts?
I'm not building this via ant, I just pulled your source from the website and build it manually like so:

javac -classpath /usr/local/hadoop/hadoop-0.20.2-core.jar:. LineIndexMapper.java LineIndexReducer.java LineIndexer.java

Thanks for the information and any light you can shed on this issue I'm hitting.
Best Regards,
-Joe

Prasanna said...

Thanks! Was searching all over the net for this example.

Maheshwaran J said...

How to set Optional parameters for Task JVM reuse, Skipbadrecords without use of JobConf???

hiepnh said...

I encountered the same error "Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable",

and found that the problem was in the line "LineIndexMapper extends Mapper"

it must be "LineIndexMapper extends Mapper < LongWritable, Text, Text, Text >"

Have fun.

hiepnh said...
This comment has been removed by the author.
hiepnh said...

And you must do the same to Reducer

LineIndexReducer extends Reducer <
Text, Text, Text, Text >

or you will get a wrong result with Hadoop-0.20.2 as below:

a file1@1
a file1@3
...

StartupCTO - Helping Small Teams Develop Great Software