Amazon Elastic MapReduceで日本語のwordcountを試した時のメモ #jawsug

Amazon Elastic MapReduceで日本語のwordcountを試したので、備忘録的な意味も込めてその時のメモをまとめます。pythonで書いてますが、形態素解析のライブラリがあればどの言語でも大丈夫だと思います。

目次
  1. Amazon Elastic MapReduce Ruby Clientインストール
  2. hadoopの設定
  3. 形態素解析エンジンIgo用の辞書構築
  4. bootstrap.sh作成
  5. mapper.py作成
  6. reducer.py作成
  7. 入力ファイル作成
  8. jobflow作成
  9. step追加
  10. 処理結果確認
  11. jobflow停止

Amazon Elastic MapReduce Ruby Clientインストール

http://aws.amazon.com/developertools/2264 からダウンロードしてきます。

$ wget http://elasticmapreduce.s3.amazonaws.com/elastic-mapreduce-ruby.zip
$ unzip -d elastic-mapreduce-ruby elastic-mapreduce-ruby.zip
$ sudo cp -r elastic-mapreduce-ruby /opt/elastic-mapreduce-ruby
$ export PATH=$PATH:/opt/elastic-mapreduce-ruby
credentials.json作成

以下の内容でcredentials.jsonを作成します。

$ cat /opt/elastic-mapreduce-ruby/credentials.json
{
  "access-id" : "<AWSのアクセスキー>",
  "private-key" : "<AWSのシークレットアクセスキー>",
  "key-pair" : "<キーペアの名前>",
  "key-pair-file" : "<秘密鍵(***.pem)>",
  "log-url" : "s3n://<バケット名>/logs/"
}

key-pairを指定してると後述の方法でマスターノードにSSH接続ができるので、指定してたほうが便利に使えます。

hadoopの設定

今回はS3にファイルを転送するためにhadoopコマンドを使います。別の方法で出来る人はそれでもいいと思います。

hadoopインストール

http://hadoop.apache.org/common/releases.html#Download から辿って適当なバージョンを拾ってきて解凍してパスを通せば、とりあえず今回必要な機能は使えます。僕は0.20.204を使いました。

$ wget http://ftp.kddilabs.jp/infosystems/apache//hadoop/common/hadoop-0.20.204.0/hadoop-0.20.204.0-bin.tar.gz
$ tar xvf hadoop-0.20.204.0-bin.tar.gz
$ sudo cp -r hadoop-0.20.204.0/ /opt/hadoop
$ export PATH=$PATH:/opt/hadoop/bin
HDFSのバックエンドにS3を使う

Elastic MapReduceを使う前にやっとくと幸せになるかもしれない設定 を参考に、HDFSのバックエンドにS3が使えるようにします。

形態素解析エンジンIgo用の辞書構築

http://igo.sourceforge.jp/ を参考に、辞書ファイルを構築します。
http://sourceforge.jp/projects/igo/releases/ から igo-0.4.3.jar を選んでダウンロードして下さい。
http://sourceforge.net/projects/mecab/files/mecab-ipadic/2.7.0-20070801/ から mecab-ipadic-2.7.0-20070801.tar.gz を選んでダウンロードして下さい。

$ tar xvf mecab-ipadic-2.7.0-20070801.tar.gz
$ java -cp igo-0.4.3.jar net.reduls.igo.bin.BuildDic ipadic mecab-ipadic-2.7.0-20070801 EUC-JP

できた辞書ファイルをS3に配置します

$ hadoop fs -put ipadic s3n://<バケット名>/ipadic

bootstrap.sh作成

Elastic MapReduceのjobflowを作る際に、最初に一度だけ実行されるスクリプトを作成します。このスクリプトの中で、必要なファイルをS3から持ってきたり、モジュールをインストールすることができます。

#!/bin/bash
WORKDIR=/mnt/var/lib/hadoop/tmp
S3ROOT=s3n://<バケット名>

# 必要なファイルをDLする
cd $WORKDIR
hadoop fs -get $S3ROOT/ipadic .

# 実行環境を整える
sudo easy_install igo-python

bootstrap.shをS3に配置します

$ hadoop fs -put bootstrap.sh s3n://<バケット名>/bootstrap.sh

mapper.py作成

MapReduceのMapの部分を行うプログラムを記述します。今回は名詞だけを取り出すようにしました。

# *-* coding: utf-8 *-*
import sys
import codecs
from igo.Tagger import Tagger

sys.stdin = codecs.getreader('utf-8')(sys.stdin)
sys.stdout = codecs.getwriter('utf-8')(sys.stdout)

dic = '/mnt/var/lib/hadoop/tmp/ipadic'
tagger = Tagger(dic)

for line in sys.stdin:
  line.strip()
  tokens = tagger.parse(line)
  for token in tokens:
    if not u"名詞" in token.feature
      continue
    print "%s\t1" % token.surface

mapper.pyをS3に配置します。

$ hadoop fs -put mapper.py s3n://<バケット名>/mapper.py

reducer.py作成

MapReduceのReduceの部分を行うプログラムを記述します。Mapの出力がそのまま入力になって入ってきます。

# *-* coding: utf-8 *-*
import sys
import codecs

sys.stdin = codecs.getreader('utf-8')(sys.stdin)
sys.stdout = codecs.getwriter('utf-8')(sys.stdout)

tokens = {}
for line in sys.stdin:
  token, count = line.split("\t")
  tokens[token] = tokens.get(token, 0) + int(count)

for token, count in tokens.items():
  print "%s\t%d" % (token, count)

reducer.pyをS3に配置します。

$ hadoop fs -put reducer.py s3n://<バケット名>/reducer.py

入力ファイル作成

解析したい文章をファイルに書いて保存します。僕は以下の内容にしました。

$ cat input.txt
今日の天気は晴れです。
天気がいいので歌を歌います。
お腹が空いたので今日は帰ります。
お腹が痛くなりそうなので明日は遅刻するつもりです。
天気予報を信じるなら、明日の天気は曇りでしょう。

input.txtをS3に配置します。

$ hadoop fs -put input.txt s3n://<バケット名>/input/input.txt

jobflow作成

jobflowを作成すると、EC2のインスタンスが起動して待機状態になります。

$ elastic-mapreduce --create --alive --bootstrap-action s3n://<バケット名>/bootstrap.sh
<JobFlowIDが表示される>

step追加

jobflowにstepを追加すると、自動的に実行されます。

$ elastic-mapreduce --jobflow <JobFlowID> --stream --input s3n://<バケット名>/input/input.txt --mapper "python s3n://<バケット名>/mapper.py" --reducer "python s3n://<バケット名>/reducer.py" --output s3n://<バケット名>/out

処理結果確認

処理の結果がS3に保存されているので、内容を確認します。

$ hadoop fs -cat s3n://<バケット名>/out/* | sort -rk 2
天気    4
明日    2
今日    2
お腹    2
予報    1
遅刻    1
晴れ    11
つもり  1
そう    1

jobflow停止

無事に処理がすんだらjobflowを停止しましょう。停止しないとEC2のインスタンスが起動したままになり、ずっと課金されます。

$ elastic-mapreduce --jobflow <JobFlowID> --terminate

Tips

簡易テスト

mapperとreducerができたら以下のコマンドで動作確認をしておくとGoodです。

$ echo "すもももももももものうち" | python mapper.py | python reducer.py
うち	1
すもも	12
もも	21
jobflowの一覧や、stepの実行状態を確認する

以下のコマンドで確認することができます。

$ elastic-mapreduce --list
マスターノードへのSSH接続

credentials.jsonにkey-pairの情報を正しく設定していれば、以下のコマンドでマスターノードに接続することができます。一度ログインして、どういう環境になっているかを確認してみるといいでしょう。

$ elastic-mapreduce --ssh <JobFlowID>

ログはマスターノードの以下のパスに出力されます。syslogをtailすることで実行状態をリアルタイムに見ることができます。

$ ls /mnt/var/log/hadoop/steps/<step番号>/
controller stderr  stdout  syslog
$ tail -f /mnt/var/log/hadoop/steps/<step番号>/syslog
クイックリファレンス

http://aws.amazon.com/jp/documentation/elasticmapreduce/Amazon Elastic MapReduce Quick Reference Cardを印刷して持っておくと、何かと便利です。

感想とか

MapReduceの処理自体はどの言語でも簡潔に書くことができると思います。Elastic MapReduceも一度使って流れをつかめば気軽に使えそうです。ただ、ちょっと複雑なことをやろうと思うとやり方が分からないことがあります。例えば、多段MapReduceをするときに前段で処理したドキュメント数が知りたかったり、ファイルから設定値みたいなのを読み込みたかったり・・・。そういう時にどうすればいいのか、詳しい人に聞いてみたいです。