Apache Sparkを触ってみた
Apache Sparkとは?
Hadoopと同じく分散処理のフレームワーク。HadoopではMapReduceと言って複数マシンで分散処理を行ってから結果をストレージに書き出す。1回の処理では終わらない場合はデータの処理フローを形成することになり、よみ出し→分散処理→書き込みを繰り返す動きをする。MapReduceでは処理の中間結果が常にストレージに書き出されるためデータ量が大きくなっても動作し、障害からの回復も容易であると言ったメリットがある。
しかしこれではあるデータの部分集合に対し複数回で処理する場合、都度すべてのデータをストレージに書き込む処理が行われるため必要な計算コストが大きくなってしまう。
Sparkでは連続する処理の中で無駄なディスクやネットワークのI/Oを起こさないように処理することでMapReduceの問題に対処している。Sparkの高速化が期待できるのは、複数回の反復処理や連続する変換処理になります。
Sparkのデータ構造
ApacheSparkのデータ処理には「RDD」と呼ばれるデータ構造を利用する。RDDは複数のマシンから構成されるクラスタ上での分散処理を前提として設計されており、内部的にはパーティションという塊に分割される。RDDをパーティションごとに複数のマシンで処理することで分散処理が行われる。
Sparkの分散処理環境
RDDのデータ構造を分散処理するためのクラスタ管理システムには以下のようなものがある。
* YARN
Hadoopのクラスタ管理システム。Hadoopの分散ファイルシステムであるHDFSで利用するとI/Oが効率化されるらしい。
* Mesos
* Spark Standalone
Sparkに同梱されているクラスタ管理システム。別途クラスタ管理システムを用意する必要がなく手軽に利用できる。
RDDによるタスク処理
RDDには以下の情報が含まれている
* RDDの元になるデータの情報、または変換前のRDD
* RDD成の元になるデータのロード方法、変換方法
* 変換後のRDDの要素の型
* 変換後のRDDのパーティション数
* RDDが永続化されているかどうか
RDD内に含まれる上記の情報と遅延評価の性質により効率的なタスクのスケジューリングが可能となっているらしいけど詳しくは別でドキュメントを見たほうがよさそう。
http://kimutansk.hatenablog.com/entry/20130902/1378142510
https://www2.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf
Sparkのマシン構成
ホスト名 | 役割 | 説明 |
---|---|---|
spark-client | クライアント | アプリケーションを起動するクライアント |
spark-master | マスターノード | クラスタ内のリソースを管理 |
spark-worker00 | ワーカーノード | 働く |
Sparkインストール
とりあえずmacにインストールしてみる
以下のページからsparkをダウンロードして解凍後binにパスを通すととりあえず動きを試すことができる
http://spark.apache.org/downloads.html
パスを通した後は以下のコマンドが実行できたらApacheSparkはインストールされている
spark-shell –version
IDEAでの開発
- ideaでSBTプロジェクトを作成
使用するscalaのバージョンは公式ページでapiをサポートしているバージョンを事前に確認しておく(2017/3/20時点ではscala2.12からは利用できなかった)
http://spark.apache.org/docs/latest/ - 実行可能jarが生成できるようにsbt-assemblyをプラグインに追加。追加方法は以下のgithubのページを参考にする。
https://github.com/sbt/sbt-assembly
plugins.sbtに以下を追加する
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")
org.jboss.interceptorがunresolvedになったらplugins.sbtにさらに以下を追記
resolvers += "JBoss" at "https://repository.jboss.org"
- build.sbtのlibraryDependenciesにspark-coreを追加する。
バージョンには気をつける
libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.1.0" )
- abt-assemblyのオプションをbuild.sbtに追加
詳しくは以下のペジを参照。ファイル読み込み時に複数ファイルが見つかった時どうするかのために必要
https://github.com/sbt/sbt-assemblyassemblyMergeStrategy in assembly := { case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first case "application.conf" => MergeStrategy.concat case "unwanted.txt" => MergeStrategy.discard case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) }
ついでに出力するjarファイル名も設定しておくassemblyJarName in assembly := "something.jar"
- 試しにjar出力
ApacheSparkで実行可能なjarを試しに出力しておく > sbt assembly で多分失敗するので、その場合はlibraryDependenciesをprovidedにしておく。"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided"
それから再度sbt assemlbyを実行して所定のパスにjarが生成されることを確認。ただprovidedの場合有効なスコープがコンパイル時とテスト時になる。今回はIDEAから直接実行して簡単に動きを見るだけなので一旦以下のように変更しておく。"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "compile"
簡単なスクリプトをIDEAから動かしてみる
それではSparkを実行するsbtプロジェクトが準備できたので試しにサンプルコードを動かしてみます。 以下のページにある単語の出現数をカウントするscalaのプログラムを動かしてみたいと思います。 http://spark.apache.org/examples.html
import org.apache.spark.{SparkConf, SparkContext} object spark_shell { def main(args:Array[String]): Unit ={ val conf = new SparkConf().setAppName("WordCount") val sc = new SparkContext(conf) try{ val filePath = "build.sbt" val wordAndCountRDD = sc.textFile(filePath) .flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) wordAndCountRDD.collect.foreach(println) } sc.stop() } }
まあ、単純に読み込んだファイルをスペース区切りで分割してwordごとのmapでカウントアップするだけです。
これで実行してみると以下のエラーが発生すると思います。
- SparkException: A master URL must be set in your configuration
このエラーについてですが、sparkのタスクを実行する場合spark-clientからspark-masterを指定してプログラム実行の流れになるのですが、IDEAでそのまま実行するだけではmaster未指定のため上記エラーが発生すると思われます。本番環境のクラスタリング構成で動かす場合はpark-submitのオプションで指定するのかと思います。 http://spark.apache.org/docs/latest/submitting-applications.html
今回は手軽に動きを試したいのでプログラム中のSparkConfを以下のように変更しmasterとして自分自身を指定して動きを見たいと思います。
val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
それから実行してみると環境情報等読み込んだ後に以下のような出力結果が得られると思います。
(oldStrategy,1) (x,1) (endsWith,1) ("2.11.8",1) (assembly).value,1) (scalaVersion,1) (,97) (@,2) ("unwanted.txt",1) (libraryDependencies,1) ("spark-core_2.11",1) ("2.9.7",1) ("joda-time",2) (PathList(ps,1) (_*),2) (assembly,3)