Both the Yahoo Hadoop Tutorial and the Cloudera Basic Training documentation "Writing MapReduce Programs" give the example of the InvertedIndex application. I used the Cloudera VMWare implementation and source-code as a starting point.
The first major change was the inclusion of the mapreduce package, containing the new implementation of the Mapper and Reducer classes, which were Interfaces in the previous APIs in the package "mapred".
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
public class MyReducer extends Reducer{}
Also, note that these classes use the Java generics capabilities and therefore, the methods "map()" and "reduce()" must follow the convention given in your implementation. Both methods removed the use of the reporter and collector by the use of a Context class, that is a static member class of each of the Mapper and Reducer classes.
protected void map(K key, V value, Mapper.Context context)
protected void reduce(K key, IterableWhatever the type of K and V are, they must be used in the implementation. For instance, I used to have an Iterator implementation for the key in the reducer, and the reduce method was never called with the wrong method signature. So, it is important to verify that you're using the Iterable class for the values instead.
Mapper Class
// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)
package index;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* LineIndexMapper Maps each observed word in a line to a (filename@offset) string.
*/
public class LineIndexMapper extends Mapper
public LineIndexMapper() {
}
/**
* Google's search Stopwords
*/
private static Set
static {
googleStopwords = new HashSet
googleStopwords.add("I"); googleStopwords.add("a"); googleStopwords.add("about");
googleStopwords.add("an"); googleStopwords.add("are"); googleStopwords.add("as");
googleStopwords.add("at"); googleStopwords.add("be"); googleStopwords.add("by");
googleStopwords.add("com"); googleStopwords.add("de"); googleStopwords.add("en");
googleStopwords.add("for"); googleStopwords.add("from"); googleStopwords.add("how");
googleStopwords.add("in"); googleStopwords.add("is"); googleStopwords.add("it");
googleStopwords.add("la"); googleStopwords.add("of"); googleStopwords.add("on");
googleStopwords.add("or"); googleStopwords.add("that"); googleStopwords.add("the");
googleStopwords.add("this"); googleStopwords.add("to"); googleStopwords.add("was");
googleStopwords.add("what"); googleStopwords.add("when"); googleStopwords.add("where");
googleStopwords.add("who"); googleStopwords.add("will"); googleStopwords.add("with");
googleStopwords.add("and"); googleStopwords.add("the"); googleStopwords.add("www");
}
/**
* @param key is the byte offset of the current line in the file;
* @param value is the line from the file
* @param output has the method "collect()" to output the key,value pair
* @param reporter allows us to retrieve some information about the job (like the current filename)
*
* POST-CONDITION: Output <"word", "filename@offset"> pairs
*/
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// Compile all the words using regex
Pattern p = Pattern.compile("\\w+");
Matcher m = p.matcher(value.toString());
// Get the name of the file from the inputsplit in the context
String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
// build the values and write
StringBuilder valueBuilder = new StringBuilder();
while (m.find()) {
String matchedKey = m.group().toLowerCase();
// remove names starting with non letters, digits, considered stopwords or containing other chars
if (!Character.isLetter(matchedKey.charAt(0)) || Character.isDigit(matchedKey.charAt(0))
|| googleStopwords.contains(matchedKey) || matchedKey.contains("_")) {
continue;
}
valueBuilder.append(fileName);
valueBuilder.append("@");
valueBuilder.append(key.get());
// emit the partial
context.write(new Text(matchedKey), new Text(valueBuilder.toString()));
valueBuilder.setLength(0);
}
}
}
Reducer Class
// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)
package index;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* LineIndexReducer Takes a list of filename@offset entries for a single word and concatenates them into a list.
*/
public class LineIndexReducer extends Reducer
public LineIndexReducer() {
}
/**
* @param key is the key of the mapper
* @param values are all the values aggregated during the mapping phase
* @param context contains the context of the job run
*
* PRE-CONDITION: receive a list of <"word", "filename@offset"> pairs
* <"marcello", ["a.txt@3345", "b.txt@344", "c.txt@785"]>
*
* POST-CONDITION: emit the output a single key-value where all the file names
* are separated by a comma ",".
* <"marcello", "a.txt@3345,b.txt@344,c.txt@785">
*/
protected void reduce(Text key, Iterable
StringBuilder valueBuilder = new StringBuilder();
for (Text val : values) {
valueBuilder.append(val);
valueBuilder.append(",");
}
//write the key and the adjusted value (removing the last comma)
context.write(key, new Text(valueBuilder.substring(0, valueBuilder.length() - 1)));
valueBuilder.setLength(0);
}
}
These are the changes necessary for the Mapper and Reducer classes, without the need to extend the base classes. In order to unit test these classes, changes on the MRUnit are also necessary. The drivers were also added a new "mapreduce" package with the same counterparts.
Instead of the mrunit.MapDriver, use the mapreduce.MapDriver. The same for the Reducer class. The rest of the code is just the same.
JUnit's MapperTest
// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)
package index;
import static org.apache.hadoop.mrunit.testutil.ExtendedAssert.assertListEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mock.MockInputSplit;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;
/**
* Test cases for the inverted index mapper.
*/
public class MapperTest extends TestCase {
private Mapper
private MapDriver
/** We expect pathname@offset for the key from each of these */
private final Text EXPECTED_OFFSET = new Text(MockInputSplit.getMockPath().toString() + "@0");
@Before
public void setUp() {
mapper = new LineIndexMapper();
driver = new MapDriver
}
@Test
public void testEmpty() {
List
try {
out = driver.withInput(new LongWritable(0), new Text("")).run();
} catch (IOException ioe) {
fail();
}
List
assertListEquals(expected, out);
}
@Test
public void testOneWord() {
List
try {
out = driver.withInput(new LongWritable(0), new Text("foo")).run();
} catch (IOException ioe) {
fail();
}
List
expected.add(new Pair
assertListEquals(expected, out);
}
@Test
public void testMultiWords() {
List
try {
out = driver.withInput(new LongWritable(0), new Text("foo bar baz!!!! ????")).run();
} catch (IOException ioe) {
fail();
}
List
expected.add(new Pair
expected.add(new Pair
expected.add(new Pair
assertListEquals(expected, out);
}
}
JUnit's ReducerTest
// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)
package index;
import static org.apache.hadoop.mrunit.testutil.ExtendedAssert.assertListEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.junit.Before;
import org.junit.Test;
/**
* Test cases for the inverted index reducer.
*/
public class ReducerTest extends TestCase {
private Reducer
private ReduceDriver
@Before
public void setUp() {
reducer = new LineIndexReducer();
driver = new ReduceDriver
}
@Test
public void testOneOffset() {
List
try {
out = driver.withInputKey(new Text("word")).withInputValue(new Text("offset")).run();
} catch (IOException ioe) {
fail();
}
List
expected.add(new Pair
assertListEquals(expected, out);
}
@Test
public void testMultiOffset() {
List
try {
out = driver.withInputKey(new Text("word")).withInputValue(new Text("offset1")).withInputValue(
new Text("offset2")).run();
} catch (IOException ioe) {
fail();
}
List
expected.add(new Pair
assertListEquals(expected, out);
}
}
You can test them using the command "ant test" on the source-code directory as usual to confirm that the implementation is correct:
training@training-vm:~/git/exercises/shakespeare$ ant test
Buildfile: build.xml
compile:
[javac] Compiling 4 source files to /home/training/git/exercises/shakespeare/bin
test:
[junit] Running index.AllTests
[junit] Testsuite: index.AllTests
[junit] Tests run: 5, Failures: 0, Errors: 0, Time elapsed: 0.418 sec
[junit] Tests run: 5, Failures: 0, Errors: 0, Time elapsed: 0.418 sec
[junit]
BUILD SUCCESSFUL
Total time: 2 seconds
Replacing JobConf and other deprecated classes
Other changes related to the API is on the configuration of the execution of the jobs. The class "JobConf" was deprecated, but most of the tutorials have not been updated. So, here's the updated version of the main example driver using the Configuration and Context classes. Note that the job is configured and executed with the default version of the configuration. It is the class responsible for configuring the execution of the tasks. Once again, the replacement of the classes located at the package "mapred" is important, since the new classes are located at the package "mapreduce".
InvertedIndex driver
// (c) Copyright 2009 Cloudera, Inc.
// Hadoop 0.20.1 API Updated by Marcello de Sales (marcello.desales@gmail.com)
package index;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* LineIndexer Creates an inverted index over all the words in a document corpus, mapping each observed word to a list
* of filename@offset locations where it occurs.
*/
public class LineIndexer extends Configured implements Tool {
// where to put the data in hdfs when we're done
private static final String OUTPUT_PATH = "output";
// where to read the data from.
private static final String INPUT_PATH = "input";
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new LineIndexer(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = new Job(conf, "Line Indexer 1");
job.setJarByClass(LineIndexer.class);
job.setMapperClass(LineIndexMapper.class);
job.setReducerClass(LineIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
return job.waitForCompletion(true) ? 0 : 1;
}
}
After updating, make sure to run generate a new jar, remove anything under the directory "output" (since the program does not clean that up), and execute the new version.
Buildfile: build.xml
compile:
[javac] Compiling 4 source files to /home/training/git/exercises/shakespeare/bin
jar:
[jar] Building jar: /home/training/git/exercises/shakespeare/indexer.jar
BUILD SUCCESSFUL
Total time: 1 second
I have added 2 ASCII books in the input directory: the works from Leonardo Da Vinci and the first volume of the book "The outline of science".
training@training-vm:~/git/exercises/shakespeare$ hadoop fs -ls input
Found 3 items
-rw-r--r-- 1 training supergroup 5342761 2009-12-30 11:57 /user/training/input/all-shakespeare
-rw-r--r-- 1 training supergroup 1427769 2010-01-04 17:42 /user/training/input/leornardo-davinci-all.txt
-rw-r--r-- 1 training supergroup 674762 2010-01-04 17:42 /user/training/input/the-outline-of-science-vol1.txt
The execution and output of running this example is shown as follows.
training@training-vm:~/git/exercises/shakespeare$ hadoop jar indexer.jar index.LineIndexer
10/01/04 21:11:55 INFO input.FileInputFormat: Total input paths to process : 3
10/01/04 21:11:56 INFO mapred.JobClient: Running job: job_200912301017_0017
10/01/04 21:11:57 INFO mapred.JobClient: map 0% reduce 0%
10/01/04 21:12:07 INFO mapred.JobClient: map 33% reduce 0%
10/01/04 21:12:10 INFO mapred.JobClient: map 58% reduce 0%
10/01/04 21:12:13 INFO mapred.JobClient: map 63% reduce 0%
10/01/04 21:12:16 INFO mapred.JobClient: map 100% reduce 11%
10/01/04 21:12:28 INFO mapred.JobClient: map 100% reduce 77%
10/01/04 21:12:34 INFO mapred.JobClient: map 100% reduce 100%
10/01/04 21:12:36 INFO mapred.JobClient: Job complete: job_200912301017_0017
10/01/04 21:12:36 INFO mapred.JobClient: Counters: 17
10/01/04 21:12:36 INFO mapred.JobClient: Job Counters
10/01/04 21:12:36 INFO mapred.JobClient: Launched reduce tasks=1
10/01/04 21:12:36 INFO mapred.JobClient: Launched map tasks=3
10/01/04 21:12:36 INFO mapred.JobClient: Data-local map tasks=3
10/01/04 21:12:36 INFO mapred.JobClient: FileSystemCounters
10/01/04 21:12:36 INFO mapred.JobClient: FILE_BYTES_READ=58068623
10/01/04 21:12:36 INFO mapred.JobClient: HDFS_BYTES_READ=7445292
10/01/04 21:12:36 INFO mapred.JobClient: FILE_BYTES_WRITTEN=92132872
10/01/04 21:12:36 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=26638259
10/01/04 21:12:36 INFO mapred.JobClient: Map-Reduce Framework
10/01/04 21:12:36 INFO mapred.JobClient: Reduce input groups=0
10/01/04 21:12:36 INFO mapred.JobClient: Combine output records=0
10/01/04 21:12:36 INFO mapred.JobClient: Map input records=220255
10/01/04 21:12:36 INFO mapred.JobClient: Reduce shuffle bytes=34064153
10/01/04 21:12:36 INFO mapred.JobClient: Reduce output records=0
10/01/04 21:12:36 INFO mapred.JobClient: Spilled Records=2762272
10/01/04 21:12:36 INFO mapred.JobClient: Map output bytes=32068217
10/01/04 21:12:36 INFO mapred.JobClient: Combine input records=0
10/01/04 21:12:36 INFO mapred.JobClient: Map output records=997959
10/01/04 21:12:36 INFO mapred.JobClient: Reduce input records=997959
The index entry for the word "abandoned" is an example of one present in all of the books:
training@training-vm:~/git/exercises/shakespeare$ hadoop fs -cat output/part-r-00000 | less
...
...
abandoned leornardo-davinci-all.txt@1257995,leornardo-davinci-all.txt@652992,all-shakespeare@4657862,all-shakespeare@738818,the-outline-of-science-vol1.txt@642211,the-outline-of-science-vol1.txt@606442,the-outline-of-science-vol1.txt@641585...
...
6 comments:
I've been trying to get your code to work with hadoop 0.20.2 and I'm seeing this error:
bash-3.2$ hadoop jar /code/hadoop/indexer/LineIndexer.jar LineIndexer
10/05/24 00:55:19 INFO input.FileInputFormat: Total input paths to process : 1
10/05/24 00:55:19 INFO mapred.JobClient: Running job: job_201005181230_0054
10/05/24 00:55:20 INFO mapred.JobClient: map 0% reduce 0%
10/05/24 00:55:29 INFO mapred.JobClient: Task Id : attempt_201005181230_0054_m_000000_0, Status : FAILED
java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:845)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:541)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:621)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
Any thoughts?
I'm not building this via ant, I just pulled your source from the website and build it manually like so:
javac -classpath /usr/local/hadoop/hadoop-0.20.2-core.jar:. LineIndexMapper.java LineIndexReducer.java LineIndexer.java
Thanks for the information and any light you can shed on this issue I'm hitting.
Best Regards,
-Joe
Thanks! Was searching all over the net for this example.
How to set Optional parameters for Task JVM reuse, Skipbadrecords without use of JobConf???
I encountered the same error "Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable",
and found that the problem was in the line "LineIndexMapper extends Mapper"
it must be "LineIndexMapper extends Mapper < LongWritable, Text, Text, Text >"
Have fun.
And you must do the same to Reducer
LineIndexReducer extends Reducer <
Text, Text, Text, Text >
or you will get a wrong result with Hadoop-0.20.2 as below:
a file1@1
a file1@3
...
Post a Comment