Hadoopについて調べてクラスタを構築してみた

並列分散処理入門

Hadoop,Spark周りについて調べたことをまとめてみる

並列分散処理とは

並列分散処理とは複数のサーバを同時に動かしてデータを処理することである。ビッグデータと呼ばれるような大量のデータを扱う場合はたくさんのサーバを使ってクラスターを構成し、多ければ数千台のサーバを使うということもある。

並列分散処理ツール登場の背景

既存のRDBMSがある中でHadoopなどの代表的な並列分散処理ツールが登場した背景として、RDBMSではトランザクションやACIDなど複雑なデータに不整合が起きないような設計によりそのまま複数台の端末で動かそうとするのであればディスクI/Oの方がボトルネックとなりCPU資源を使い切れず速度が出し切れなくなる問題があった。この問題を解決するためHadoopなどの並列分散処理ツールではRDBMSにあったデータに不整合が発生しないように気にする機能を除外し、Mapreduceという仕組みにより複数台の端末で同時に処理してもディスクI/Oがボトルネックとならないように作られている。

Hadoopについて

概要

Hadoopとは代表的な並列分散処理ツールでGoogleのDoug cuttingにより作られた。Hadoopの名前の由来はDougの子供が持っていたゾウのぬいぐるみ。HadoopではHDFS(Hadoop Distributed File System)により複数のサーバにリソースを分散させて、Mapreduceにより複数の端末で処理を実行するということができるようになっている。
オープンソースで開発が行われておりチケット管理にJiraを使用している。
https://issues.apache.org/jira/projects/HADOOP/summary

HDFS

HDFSとは巨大なデータを扱うとことに特化した分散ファイルシステム。データサイズとして想定しているのは1ファイルが数GB以上に及ぶようなファイルであり、そのような大きいファイルを高速に扱うことに特化している。HDFS自体にレプリケーションの対障害設定があり、基本的にはHDFSの役割を果たすノードにRAID構成は不要とされている。HDFSバッチ処理に的するように作られておりデータの書き込みは一度だけで、それ以降は読み込み及び新しい書き込みが行われ、データの一部更新はできないようになっている。

Mapreduce

多数のサーバを利用して巨大なデータを分散処理するためのフレームワークMapreduceではクラスター上の各ノードに以下の役割がある。
○NameNode
 データがどこに配置されているかなどのメタデータを管理する。各ノードに対してハートビートで死活監視を行う。NameNode自体が死んだ時のためにSecondaryNameNodeを設定することもできる。
○JobTracker
MapReduceのマスターサーバで1つのジョブをタスクと呼ばれる複数の処理に分割し、各スレーブにタスクを割り振る
○DataNode
データを保存する
○TaskTracker
タスクを処理する

既存のMapreduceでは5000ノードからなる40000タスクを実行した時にJobTrackerがボトルネックとなることが確認されており、これは単一のJobTrackerに次の2つの異なる責任が課せられているためとされている。
クラスターでの計算リソースの管理
クラスター上で実行されるすべてのタスクの調整

この問題を解決するために単一のJobTrackerを使用するのではなく、クラスター・マネージャーを導入するようになった。クラスター・マネージャーの役割はクラスター内のライブ・ノードと使用可能なリソースを追跡して、タスクに割り当てることである。スレーブ側のTaskTracker側で短時間存続する JobTrackerが開始されるようになったが、これによりジョブのライフサイクルの調整は、クラスター内で使用可能なすべてのマシンに分散されさらに多くのタスクを分散処理できるようになった。この流れでYARNが登場した。

YARN

JobTrackerが担っていた2つの役割のを分割し、スレーブ側でタスクの調整が行えるよう疎結合になったことでhadoop自体の分散処理エンジン以外を使うことができるようになりSparkで処理を実行することができるようになっている。Mapreduceの時に使っていた用語は以下のように変わっている。

クラスター・マネージャー → ResourceManager
短時間存続する専用の JobTracker → ApplicationMaster
TaskTracker → NodeManager
ジョブ → 分散アプリケーション

YARNの登場により従来のMapreduceはMRv1、YARNがMRv2という扱いになった。Hadoopはversion2からYARNが使えるようになった。

Sparkについて

概要

scalaで書かれた並列分散処理のエンジン部分。HDFS上のデータにアクセスしてデータを処理することができる。Hadoopとの違いとして、Hadoopの分散処理エンジンでは計算結果をメモリ上には保存せずディスクに保存する作りになっている。それに比べてSparkでは計算結果をメモリ上に保存して再利用するという仕組みがあるためHadoopの分散処理エンジンと比べるとディスクI/Oの影響で数倍早いと言われている。このように仕組みが異なっている原因としてはHadoopとSparkの開発時期の違いが考えられ、Hadoopの開発が始まった頃はサーバ1台あたりのメモリを8GBくらいで想定していたのに対しSparkの開発時にはサーバ1台でメモリ100GB以上とか積んでいるのも珍しくないような時代背景の違いが考えられる。

用途

Sparkは以下のような用途で使われている。
バッチ処理
単純にhadoopよりも早いためバッチ処理もsparkが使われることが多いきがする
機械学習処理
機械学習のライブラリ(MLib)がありhadoopで使っていたMahoutより高速化できたという事例がある
○ストリーム処理
HDFS以外でもKafkaから送られてくるストリーミングデータを処理することができ、これによりバッチ処理、ストリーミング処理の両面でリアルタイムな機械学習でレコメンドするといったことが可能になっている

hadoopクラスタ管理

hadoopクラスタでは複数のサーバを扱わなければいけないので管理の面で気をつけなければいけないが、Clouderaが出しているマネジメントツールにより各ノードの管理が行いやすくなるらしい。clouderaのマネジメントツールを使わないにしてもディストリビューションで管理している対象ツールのバージョンを確認することでHadoopクラスタ構築時にどのバージョンを入れれば良いのかの参考になりそう(Hadoop, Spark, Hiveなど)

Hadoopクラスタ構築

実際にHadoopクラスタを構築してみたいと思います。
1.javaのインストール
hadoopはopenJdkではなくOraclejdkを推奨している
2.プログラムのダウンロード
今回はHadoopとSpark,Hiveをインストールする

wget http://ftp.jaist.ac.jp/pub/apache/hadoop/common/hadoop-2.6.5/hadoop-2.6.5.tar.gz
wget https://d3kbcqa49mib13.cloudfront.net/spark-1.6.0-bin-hadoop2.6.tgz
wget https://archive.apache.org/dist/hive/hive-0.12.0/hive-0.12.0-bin.tar.gz    

3.プログラムの展開及び配置
4.環境変数を追加
vi ~/.bashrc 以下を追加

# Hadoop
export HADOOP_HOME=HADOOPの展開先
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH

# spark
export SPARK_HOME=Sparkの展開先
export PATH=$SPARK_HOME/bin:$PATH

# hive
export HIVE_HOME=Hiveの展開先
export PATH=$HIVE_HOME/bin:$PATH

source ~/.bashrc

HADOOP_CONF_DIRを環境変数に追加しているが、これはSparkで使用している。

5.設定ファイルの編集
HADOOP_CONF_DIRのディレクトリの以下の設定ファイルを編集します。

core-site.xml : マスターノードの設定
hdfs-site.xml : データ保存の設定、マスター側とスレーブで設定は異なる
hive-site.xml : hive展開時のconfディレクトリにあったhive-default.xml.templateをリネーム
mapred-site.xml : Mapreduceの設定 yarnを使う場合はここで指定する
yarn-site.xml : リソースマネージャの設定

上記設定ファイルでサーバを指定する場合はIPアドレスではなくFQDNで指定しなければいけないので注意が必要
○core-site.xml

<configuration>
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://ネームノードのFQDN:9000</value>
  </property>
</configuration>

hdfs-site.xml(NameNodeの場合)

<configuration>
  <property>
    <name>dfs.replication</name>
    <value>3</value>
  </property>
  <property>
    <name>dfs.namenode.name.dir</name>
    <value>/data/1/dfs/nn</value>
  </property>
</configuration>

hdfs-site.xml(DataNodeの場合)

<configuration>
  <property>
    <name>dfs.replication</name>
    <value>3</value>
  </property>
  <property>
    <name>dfs.datanode.data.dir</name>
    <value>/data/1/dfs/dn,/data/2/dfs/dn,/data/3/dfs/dn</value>
  </property>
</configuration>

○mapred-site.xml

<configuration>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
</configuration>

○yarn-site.xml

<configuration>
  <!-- Site specific YARN configuration properties -->
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
  <property>
    <name>yarn.resourcemanager.hostname</name>
    <value>リソースマネージャのFQDN</value>
  </property>
  <property>
    <name>yarn.web-proxy.address</name>
    <value>WebAppProxyのFQDN:9046</value>
  </property>
</configuration>

7.サービスの起動
設定が終わったらサービスを起動させます。NameNodeリソースマネージャが同一で、それ以外にDataノード兼TaskTrackerのスレーブが連なる場合は、以下のようになります。

  • マスターノード
# $HADOOP_HOME/sbin/start-all.sh
# yarn-daemon.sh start proxyserver
# start-yarn.sh
# mr-jobhistory-daemon.sh start historyserver
  • スレーブノード
# $HADOOP_HOME/sbin/start-all.sh
# start-yarn.sh

サービス起動後は各ノードでjpsを実行しサービスが立ち上がっているか確認します。それから以下URLにアクセスします。
http://ネームノードのFQDN:50070/dfshealth.html#tab-datanode
http://ノードマネージャのFQDN:8088/cluster/nodes

8.hdfsコマンドの確認 基本的なhdfsコマンドを実行しファイルのアップロードと参照が可能か確認します。

hdfs dfs -ls /
hdfs dfs -mkdir /input
echo "upload test" > test.txt
hdfs dfs -put test.txt /input
hdfs dfs -cat /input/test.txt

上記実行ご別のノードでもアップロードしたファイルが参照できるか確認してみます。

Hadoopクラスタ上でSparkのプロジェクトを動かしてみる

Sparkの簡単なプロジェクトを作成してHadoopクラスタ上で動かしてみたいと思います。
1.プロジェクト作成
まずsbt newかintelijの新規プロジェクト作成でプロジェクトを作成します。

2.build.sbtの設定
今回はspark-coreだけで十分のはずですが一応spark-sqlとかもlibraryDependenciesに追加しています。今回はjarを生成してsparkのコマンド経由で実行するのですが、jarを生成できるようにassemblyのプラグインを追加しています。またlibraryDependenciesでspark関連をprovidedで指定することも注意してください。

name := "StudySpark"

version := "1.0"

scalaVersion := "2.11.8"
val sparkVersion = "2.1.0"

// Intelijより直接実行する場合はspark関係はcompileにする、sbt assemblyでjarを出力する場合はprovidedにしておく
libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.11" % sparkVersion % "provided",
  "org.apache.spark" % "spark-sql_2.11" % sparkVersion % "provided",
  "org.apache.spark" % "spark-mllib_2.11" % sparkVersion % "provided",
  "org.apache.spark" % "spark-graphx_2.11" % sparkVersion % "provided",
  "org.apache.spark" % "spark-hive_2.11" % sparkVersion % "provided",
  "joda-time" % "joda-time" % "2.9.7"
)

// assembly settings
assemblyJarName in assembly := "studyspark.jar"
assemblyMergeStrategy in assembly := {
  case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first
  case "application.conf"                            => MergeStrategy.concat
  case "unwanted.txt"                                => MergeStrategy.discard
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

3.サンプルのプログラム作成
よく使われるワードカウントのプログラムを作ります。

package intoroduction.spark

import org.apache.spark.{SparkConf, SparkContext}


/**
  * 実行コマンド
  * spark-submit --master yarn \
  * --class intoroduction.spark.WordCount \
  * --name WordCount target/scala-2.11/studyspark.jar \
  * /input/test.txt
  *
  */
object WordCount {

  def main(args: Array[String]) = {
    require(args.length >= 1, "ファイルを指定してください")

    val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(conf)

    try {
      val filePath = args(0)
      val wordAndCountRDD = sc.textFile(filePath)
                .flatMap(_.split("[ ,.]"))
                .filter(_.matches("""\p{Alnum}+"""))
                .map((_, 1)).reduceByKey(_ + _)

      wordAndCountRDD.collect.foreach(println)

    } finally {
      sc.stop()
    }
  }
}

4.jar生成 以下のコマンドでjarを生成します。
sbt assembly

5.プログラム実行
それから以下のコマンドでプログラムを実行します。

spark-submit --master yarn \
--class intoroduction.spark.rdd.WordCount \
--name WordCount target/scala-2.11/studyspark.jar \
/input/test.txt

うまくいけばコンソールに実行結果が出力されるはずです。 実行ステータスはノードマネージャのweb画面から見ることもできます。 http://ノードマネージャのFQDN:8088/cluster/nodes

デバック実行したい場合は事前に以下の環境変数を追加しておく必要があります。
export SPARK_SUBMIT_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=7777
上記環境変数追加後はport=7777でリモートデバックできます。