/home/by-natures/dev*

データ界隈で働くエンジニアとしての技術的なメモと、たまに普通の日記。

「Hadoop 徹底入門 第2版」第2部まとめ(Hive での開発)

Hadoop 徹底入門 第2版」を有志で読み進めているので、そのメモを貼ります。

今回は第2部 13章、Hive での開発方法と Tips です。

[toc]

Chapter 13 "SQL ライクインターフェイス Hive"

Pig と同じく MapReduce を容易に活用するための手段として、SQL ライクな言語を通じて操作するインターフェイスの Hive が開発された。

Hadoop と Hive(13.2節)

Hive のシステム構成(13.2.2節, 13.3.2節, 13.4.1節, 13.4.3節)
  • Hadoop のノードとは別建てで、Hadoop の JobClient のように動作する(Hive クライアント)
  • メタストアと呼ばれる Hive のメタ情報を管理するノードにアクセスし、そこで得た情報から Hadoop にアクセスする
  • Hive クライアント, メタストア, RDBMS の取り方から、3つのバリエーションがある(設定方法は 13.4.4節)
    • 組み込みモード:Hive クライアント、メタストア、RDBMSDerby)が単一プロセスとして存在。お試し用。
    • ローカルモード:RDBMS を独立させて動作させるモード。複数のアクセスを同時に受け付けられるが、Hive クライアントは全て同じノード上に存在する必要がある
    • リモートモード:Hive クライアント・メタストア・RDBMS 全て独立させたモード。
  • Hive のデータは HDFS 上のファイルとして存在している:
    • hdfs://user/hive/warehouse/<データベース名>.db/<テーブル名>/<読み込んだファイル>
    • 例)/user/hive/warehouse/db1.db/table1/data.csv
  • Hive のデータにインデックスの概念はなく、このままだと毎回全走査する必要がある。これを防ぐために、パーティションという概念がある
    • 特定のキー値で、物理的に HDFS 上のファイルを分けてしまうもの:
    • hdfs://user/hive/warehouse/<データベース名>.db/<テーブル名>/<キー>=<値>/<読み込んだファイル>
    • HiveQL では通常通り、WHERE 句の条件等で指定すればよい

※ データベースには default が最初から存在しており、これを利用する場合はデータベースのパスが省略される

Hive と RDBMS の違い(13.2.1節)
  • オンライン処理に不向き(結局 Hadoop なので)
    • 大容量のデータを SQL ベースで低レイテンシで処理するためのエンジンの開発も近年では盛ん。Google の Dremel, Cloudera の Impala など。
  • トランザクション管理機能がない
    • 更新の概念はないが、並列実行して、1つでも失敗した場合に全体として失敗させる…などとする場合には、ユーザー自身でジョブの管理と不要な処理結果のクリーンアップが必要。
  • 行更新不可
    • 扱うのは HDFS 上のファイルのため

HiveQL(13.3節)

テーブル定義(13.3.3節)

CREATE TABLE 文でテーブル定義を行う。

  • データ型には TIMESTAMP があるため、日付も扱いやすい
  • PARTITIONED BY 句により、指定のカラム名でパーティショニング可能(13.3.4節)
  • CLUSTERED BY 句により、テーブルやパーティションに対してバケット数を指定して分割することが可能(13.4.1節)
  • STORED AS 句により、HDFS 上にファイルを格納する際のフォーマットを指定する。
    • RCFILE は列指向のため、指定されたカラムにだけアクセスできるようになる。大量の列をもち、一部のカラムにしかアクセスしない場合などに I/O 量を大幅に減らすことが出来るため、性能向上が見込める(13.4.2節)
データの投入とテーブルからのファイル出力(13.3.4節)

HDFS 上のファイルを読む場合は LOAD, Hive のテーブルから読む場合は INSERT を利用する。ここでの INSERT は SQL のように1行1行挿入するものではないため、利用する際には文法や挙動を確認する。

また、Hive のテーブルのデータを HDFS 上に書き出す場合も INSERT を利用する。

LOAD 文によるファイルシステム上からのデータ投入
LOAD DATA [LOCAL] INPATH '<ファイルパス>' [OVERWRITE] INTO TABLE <テーブル名> [PARTITION (col1=val1, col2=val2, ...)];
  • LOCAL 句を指定した場合は、ローカルファイルシステム上のファイルをロードする
  • HDFS 上のファイルを指定した場合は、move 扱いとなり、元のパスからは消える
  • OVERWRITE 句を指定した場合は置き換え、指定しない場合は追記となる
  • PARTITION 句を指定すると、特定パーティションに関するデータのみをロードする
INSERT TABLE 文によるテーブルからのデータの投入
INSERT {OVERWRITE, INTO} TABLE <テーブル名> [PARTITION (col1=val1, col2=val2, ...)] [IF NOT EXISTS]  FROM ;
  • IF NOT EXISTS は、テーブル内にデータが存在しない場合にのみデータ投入する指定となる
マルチテーブルインサート

一つのテーブルから、複数のテーブルへ振り分ける処理。INSERT TABLE 文で、FROM 句を先頭にもってきて、その後に INSERT {OVERWRITE, INTO} を列挙する。

例)
FROM table_src
    INSERT INTO      TABLE table_dst1 SELECT col1, col2 WHERE col1 >= 0
    INSERT OVERWRITE TABLE table_dst1 SELECT col1, col2 WHERE col1 <  0;
動的パーティションインサート

同一テーブルの複数パーティションに、自動的に振り分けながらデータを投入する。

INSERT {OVERWRITE, INTO} TABLE <テーブル名> PARTITION (col1, col2, ...)  FROM 
  • 文法としては、col1=val1 の =val1 を取り除いた形
  • 動的パーティションインサートはデフォルトでは off となっているため、オプションを有効にする
  • デフォルトでは、全てのパーティションではエラーとなる(strict)
INSERT DIRECTORY 文によるテーブルからのファイル出力

クエリの実行結果をファイルシステム中のディレクトリ内に出力させることができる。

INSERT OVERWRITE [LOCAL] DIRECTORY directory1 SELECT ... FROM ...
  • INSERT DIRECTORY もマルチテーブルインサート可能
SELECT 文(13.3.5節)
GROUP BY

通常は Reduce で処理されるが、Map で集約することで Reduce 処理への転送量を減らし、処理性能を向上できる場合がある。ただし Map 処理でのメモリ使用量が増加するため、注意が必要。

ORDER BY と SORT BY

ORDER BY で出力結果全体をソートする場合、Reducer の数が一つなる。hive.mapred.mode=strict とすると、ORDER BY 句に LIMIT が必須となる。(nonstrict とすれば LIMIT 句は不要だが、適当なタイミングで処理が打ち切られる)

SORT BY を指定すると、複数の Reducer 内で処理結果をソートさせられる。ただし、結果全体でソートされたものとはならないが…

CLUSTER BY と DISTRIBUTE BY

DISTRIBUTE BY を付与すると、カラム値ごとに同じ Reducer へ処理が渡る。SORT BY と組み合わせれば、DISTRIBUTE BY で指定したカラムにおいて、ソートすることが可能。

SELECT col1, col2 FROM table1 DISTRIBUTE BY col1 SORT BY col2;

DISTRIBUTE BY, SORT BY 共に同じカラムの場合は、CLUSTER BY としてまとめることができる。

効果的な Hive の使い方(13.4節)

Hive でのチューニング(13.4.1節)
パーティショニングとJOIN

JOIN した後の WHERE だと、全走査したあとに絞り込むことになり、パーティション指定されない:

SELECT a.val, b.val
FROM a
LEFT OUTER JOIN b
ON (a.key=b.key)
WHERE a.ds='2013-01-01' AND b.ds='2013-01-01'

ON 句で条件を指定すれば、パーティション指定となる:

SELECT a.val, b.val
FROM a
LEFT OUTER JOIN b
ON (a.key=b.key AND a.ds='2013-01-01' AND b.ds='2013-01-01')
Map Join

JOIN の処理は Reducer で行われるが、これを Map 処理でやってしまおうというもの(分散キャッシュの機構を利用)。Shuffle, Reduce 処理が省かれ、非常に効率が良くなる。

被 JOIN 対象のテーブルをハッシュテーブルに読み込むため、メモリに乗る程度の小さなテーブルでないとだめ。

SELECT /*+ MAPJOIN(b) */ a.key, a.value
FROM a
JOIN b
ON a.key = b.key

/+ MAPJOIN(b) / はコメントではなく、明示的に Map Join を利用するヒント句。Hive 0.10 からは、ヒント句なしで Map Join を利用するように最適化されている。

Bucket Map Join と Sort Merge Join

メモリに乗る程度…という制約から、両テーブルがうまくバケット分割されている場合にも Map Join が利用可能。

具体的には:

  • 両テーブルが同じキーで CLUSTERED BY 指定されている
  • 両テーブルのバケット数が、どちらかの倍数になっている

更に、ソート済みでバケット数が両テーブルで一致している場合は、ソートマージジョインが利用可能。

Hive のインストール(6.5節)

cloudera のリポジトリから yum で入る

$ sudo -u hive hive
hive>

おまじない

$ sudo chown hive.hive derby.log

テーブル作成

CREATE TABLE tracking (
             member STRING,
             visit INT,
             page_view INT,
             date_str STRING,
             ip_addr STRING,
             site STRING,
             page STRING,
             referring_site STRING,
             referring_page STRING,
             referring_query STRING,
             user_agent STRING,
             os STRING,
             app_key STRING)
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
STORED AS TEXTFILE;