/home/by-natures/dev*

データ界隈で働くエンジニアとしての技術的なメモと、たまに普通の日記。

「Hadoop 徹底入門 第2版」第2部まとめ(Java での開発)

Hadoop 徹底入門 第2版」を有志で読み進めているので、そのメモを貼ります。

今回は第2部、8章〜10章、Java での開発方法と Tips です。

[toc]

Chapter 8 "MapReduce プログラミングの基礎 -Javaによる開発(1)-"

ソースコードからのプログラム実行(8.2節)

  • WordCount.java(本付属のサンプルコードよりダウンロード可能)
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool {
  public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(getConf(), "word count");
    job.setNumReduceTasks(2);
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    TextInputFormat.addInputPath(job, new Path(args[0]));
    TextOutputFormat.setOutputPath(job, new Path(args[1]));
    job.getConfiguration().setBoolean("mapred.used.genericoptionsparser", true);
    return(job.waitForCompletion(true) ? 0 : 1);
  }

  public static void main(String[] args) throws Exception{
    int res = ToolRunner.run(new Configuration(), new WordCount(), args);
    System.exit(res);
  }
}

各種 API

Mapper

http://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/mapreduce/Mapper.html

Reducer

http://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/mapreduce/Reducer.html

Writable, Comparable, WritableComparable

入出力データの型が継承すべき抽象クラスたち。

  • データ入出力においてシリアライズが必須なため、Hadoop 独自の軽量 Writable クラスが提供されている
  • Shuffle & Sort の処理において比較可能である必要があるため、Comparable クラスが提供されている
  • 各データ型のクラスは、WritableComparable クラスを implements する
InputFormat, OutputFormat
  • TextInputFormat, TextOutputFormat
  • DBInputFormat, DBOutputFormat もある(詳しくは第9章、第10章)

Chapter 9 "MapReduce プログラミングの基礎 -Javaによる開発(2)-"

入力データの取り扱いの制御(9.2節)

  • 以下、クラスの例において例外処理は割愛する
  • 独自クラスや、デフォルトで設定されている処理クラスを変更する場合は、Job オブジェクトの set** メソッドでクラス名を指定すればよい
InputFormat(9.2.1節)

API doc: http://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/mapred/InputFormat.html

public abstract class InputFormat<K,V> {
    public abstract List<InputSplit> getSplits(JobContext context);
    public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context);
}
  • getSplits は、入力データ全体を Map タスクごとの入力になる "split" という単位に分割する
    • つまり、split の数が Map タスクの数になる
    • FileInputFormat と、これを継承した TextInputFormat の場合は、HDFS のブロック1つ1つが InputSplit と対応する
      • 例外:
        • mapred.max.split.size に、ブロックサイズよりも小さい値が設定されている場合
        • 入力ファイルが圧縮されている場合は、ファイル1つに対して InputSplit が1つとなる(分割できないため)
  • InputSplit の情報は HDFS 上の一時ファイルに書き込まれ、各 Map タスクで実行される
    • つまり Writable クラスを implements している必要がある
InputSplit(9.2.3節)

API doc: http://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/mapred/InputSplit.html

public abstract class InputSplit extends Writable {
    public abstract long getLength();
    public abstract String[] getLocations();
}
  • InputSplit の情報は HDFS 上の一時ファイルに書き込まれ、各 Map タスクで実行される
    • つまり Writable クラスを implements している必要がある
  • getLength() は、InputSplit のデータサイズをバイト数で返す
  • getLocations() は、対応するデータブロックをもっているスレーブノードのリストを返す
    • JobTracker は、getLocations() の返り値から、データのローカリティを考慮しつつタスクをスレーブノードに割り当てる
  • 実用上は上記2つのメソッドだけでは不十分。InputSplit の子クラスである FileSplit は、getPath, getStart というメソッドを新たに追加している
RecordReader(9.2.2節)

API doc: http://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/mapred/RecordReader.html

public interface RecordReader<K,V>:
 void   close() 
          Close this InputSplit to future operations.
 K  createKey() 
          Create an object of the appropriate type to be used as a key.
 V  createValue() 
          Create an object of the appropriate type to be used as a value.
 long   getPos() 
          Returns the current position in the input.
 float  getProgress() 
          How much of the input has the RecordReader consumed i.e.
 boolean    next(K key, V value) 
          Reads the next key/value pair from the input for processing.
  • Mapper の map メソッドに渡るレコードを表すクラス
  • InputFormat クラスの createRecordReader メソッドにより生成される
  • RecordReader は実際の Map 処理で必要となるため、スレーブノード上で実行される(InputSplit はジョブを投入する前に必要なため、クライアント側で実行される)

出力データの取り扱いの制御(9.3節)

  • 入力データの場合と同じように、OutputFormat, RecordWriter インタフェースが存在する(split の概念はないため、InputSplit に対応するクラスはない)。

独自のデータ型を定義する(9.4節)

(Writable, WritableComparable の説明なので割愛)

Shuffle フェーズでの動作を制御する(9.5節)

Partitioner(9.5.1節)
public abstract class Partitioner<KEY, VALUE> {
    public abstract int getPartition(KEY key, VALUE value, int numPartitions);
}
  • デフォルトでは HashPartition が利用され、key のハッシュ値の剰余を取っている
  • 独自の Partitioner クラスを定義し、Configuration インスタンスに設定することも可能
Comparator(9.5.2節)
  • 同じ Reducer に処理をさせるために、独自の比較クラスを定義することが可能
Combiner(9.5.4節)
  • Reduce タスクに渡す前に、Reduce タスクと同等の処理を Map タスクのノードで実行できるのが Combiner
    • Map タスクの結果を Partitioner が分類し、Combiner が実行される
  • Reduce タスクを処理する Reduce クラスと同じクラスを割り当てることも可能(カウント系の処理など、ローカル上で集約可能な場合)

MapReduce アプリケーションのテストとデバッグ(9.6節)

開発のコツ
  • 一部の不正なデータでジョブ全体が失敗することを避けるため、不正なデータが来てもジョブを継続させる
    • MapReduce ジョブ全体で例外を投げる場面は限定的
  • Hadoop MapReduce アプリケーション開発では、少ない台数と小さなデータで検証を始める
    • 段階的にクラスタ環境とデータ規模を大きくし、手戻りを小さくする
  • 基本的に Java での開発であれば何でも出来てしまうが、MapReduce の作法に従うことでスケーラビリティと耐障害性を確保する
    • タスク間で通信せず、共通のリソースに出来る限りアクセスしない
テスト
  • 単体テストは MRUnit を利用すると良い
    • map, reduce は Context クラスの write メソッドで出力されるため、返り値で捕捉できない
  • MapReduce アプリケーションのデバッグ難易度は高いため、通常のアプリケーションよりも高品質なコードが求められる
public class WordCountTest extends TestCase {
    private WordCount.TokenizerMapper mapper;
    private MapDriver driver;

    @Before
    public void setUp() {
        mapper = new WordCount.TokenizerMapper();
        driver = new MapDriver(mapper);
    }

    @Test
    public void testWordCountMapper() {
        driver.withInput(new LongWritable(0), new Text("this is a pen"))
              .withOutput(new Text("this"),   new IntWritable(1))
              .withOutput(new Text("is"),     new IntWritable(1))
              .withOutput(new Text("a"),      new IntWritable(1))
              .withOutput(new Text("pen"),    new IntWritable(1))
              .runTest();
    }
}
  • MapDriver オブジェクトを利用し、入力と出力のテストを行う例

参考

Sqoop (“SQL-to-Hadoop”) is a straightforward command-line tool with the following capabilities:

Imports individual tables or entire databases to files in HDFS
Generates Java classes to allow you to interact with your imported data
Provides the ability to import from SQL databases straight into your Hive data warehouse

http://blog.cloudera.com/blog/2009/06/introducing-sqoop/

Chapter 10 "MapReduce プログラミングの基礎 -Javaによる開発(3)-"

圧縮データを扱う(10.2節)

  • コーデック実装として gzip, bzip2 も利用可能だが、MapReduce アプリケーションにおいては速度重視の SnappyCodec がよい
  • 入力ファイルが圧縮されている場合は、ファイル1つにつき InputSplit が1つ作成される
    • データブロックごとに独立して伸張できる保証がないため

MapReduce での処理に適したファイルフォーマット(10.3節)

  • MapReduce アプリケーションが入出力として利用するキーバリューをバイト列として格納するためのファイル形式として、SequenceFile がある
    • ジョブとジョブの間での変換処理が減り、効率的
    • hdfs コマンドで簡単に中身を確認することもできる
    • 圧縮方法には、RECORD, BLOCK の2種類がある
      • RECORD: キーバリューのバリューを圧縮してファイルに出力する
      • BLOCK: ブロック単位で圧縮してファイルに出力する(推奨)

http://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/io/SequenceFile.html

一度に複数のファイルフォーマットを扱う(10.4節)

  • MultipleInputs
    • 複数のインプットに対し、それぞれ Mapper を定義する
MultipleInputs.addInputPath(job,
                            new Path("/input/txt"),
                            TextInputFormat.class,
                            FooMapper.class);                     // 独自定義する場合
MultipleInputs.addInputPath(job,
                            new Path("/input/seq"),
                            SequenceFileAsTextInputFormat.class,
                            BarMapper.class);                     // 独自定義する場合

MultipleInputs は、ファイル形式が異なるようなインプットに用いるとよい?

  • MultipleOutputs
    • reduce メソッド内にて、出力を切り替えることが出来る
private MultipleOutputs mos;
...
public void reduce(Text key, Iterable<IntWritable> values, Context context) {
    ...
    mos.write("text", key, result);
    mos.write("seq",  key, result);
}

MultipleOutputs は、エラー出力だけ別ファイルに出力したい場合などに利用できる。

分散キャッシュの利用(10.5章)
  • 入力データに付随して必要となるデータがある場合、各スレーブノードにファイルを配布することが可能
    • 複数タスクで共有されるため、不要なファイル転送を避けることができる
  • 指定方法は2つ
    • ジョブ起動時の hadoop オプションに -files オプションとして渡すことが可能(HDFS 上になくとも良い)
    • DistributedCache クラスが提供する API を利用する(HDFS 上にないと駄目)