"word co-occurrence problem" (文書内の近くにペアで出現する単語の数をカウントする処理)の2つの基本パターンである "Pairs" と "Stripes" から、まずは、Pairs を見てみます。
事前準備
「カラマーゾフの兄弟」のテキストを HDFS に保存しておきます。
$ wget http://www.gutenberg.org/files/28054/28054.zip $ unzip 28054.zip $ hadoop fs -copyFromLocal 28054.txt Karamazov.txt
ソースコード
例えば、連続して出現する単語のペアをカウントする場合、次のようなコードが書けます。
pairs/TextPair.java
package pairs; /* テキストのタプルを Key に使用するためのクラスです。 像本のサンプル・コード http://examples.oreilly.com/9780596521981/htdg-examples-0.1.1.tar.gz より、下記のソースをここに挿入 htdg-examples-0.1.1/src/main/ch04/java/TextPair.java */
pairs/Pairs.java
package pairs; import java.io.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class Pairs extends Configured implements Tool { public static void main( String[] args ) throws Exception { int status = ToolRunner.run( new Pairs(), args ); System.exit( status ); } @Override public int run( String[] args ) throws Exception { Job job = new Job( getConf(), "MapReduce Job" ); job.setJarByClass( Pairs.class ); job.setMapperClass( Map.class ); job.setReducerClass( Reduce.class ); FileInputFormat.setInputPaths( job, new Path( args[ 0 ] ) ); FileOutputFormat.setOutputPath( job, new Path( args[ 1 ] ) ); job.setInputFormatClass( TextInputFormat.class ); job.setOutputFormatClass( TextOutputFormat.class ); // Output format of Map job.setMapOutputKeyClass( TextPair.class ); job.setMapOutputValueClass( IntWritable.class ); // Output format of Reduce job.setOutputKeyClass( TextPair.class ); job.setOutputValueClass( IntWritable.class ); return job.waitForCompletion( true ) ? 0 : -1; } public static class Map extends Mapper<LongWritable, Text, TextPair, IntWritable> { private final static IntWritable one = new IntWritable(1); private TextPair pair = new TextPair(); protected void map( LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split( "\\s+" ); for ( String s : words ) { pair.set( pair.getSecond(), new Text( s ) ); context.write( pair, one ); } } } public static class Reduce extends Reducer<TextPair, IntWritable, TextPair, IntWritable> { protected void reduce( TextPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for ( IntWritable i : values ) { count += i.get(); } context.write( key, new IntWritable( count ) ); } } }
実行例
$ javac pairs/*java; jar -cf pairs.jar pairs/ $ hadoop fs -rmr testout; hadoop jar pairs.jar pairs.Pairs -D mapred.reduce.tasks=2 Karamazov.txt testout $ hadoop fs -cat testout/part-* | grep -E "5[0-9][0-9]$" 597 I am 542 at the 589
最後のコマンドは、500〜599回登場したペアを表示しています。
考察
(1) 連続する単語の組を Map の結果の Key にするために、像本の TextPair クラスを利用しています。Hadoop で使用する Key は、WritableComparable を implement している必要があるので、構造化データを利用するときは、そのためのクラスを作ることになります。(ちょっと面倒。)
(2) 検出した単語の組ごとに、真面目に、数字の 1 を Value として出力しています。これを Reduce で全部足し上げる事で、合計値を出しています。
(3) 細かい点ですが、Map 関数の "TextPair pair" をインスタンス変数にしていますので、新しいレコードの先頭の単語は、前のレコードの最後の単語のペアとしてカウントされます。レコード間の単語のつながりを考えない場合は、"TextPair pair" は、Map 関数内のローカル変数にします。