読者です 読者をやめる 読者になる 読者になる

Apache Sparkを触ってみた

Apache Sparkとは?

Hadoopと同じく分散処理のフレームワークHadoopではMapReduceと言って複数マシンで分散処理を行ってから結果をストレージに書き出す。1回の処理では終わらない場合はデータの処理フローを形成することになり、よみ出し→分散処理→書き込みを繰り返す動きをする。MapReduceでは処理の中間結果が常にストレージに書き出されるためデータ量が大きくなっても動作し、障害からの回復も容易であると言ったメリットがある。
しかしこれではあるデータの部分集合に対し複数回で処理する場合、都度すべてのデータをストレージに書き込む処理が行われるため必要な計算コストが大きくなってしまう。
Sparkでは連続する処理の中で無駄なディスクやネットワークのI/Oを起こさないように処理することでMapReduceの問題に対処している。Sparkの高速化が期待できるのは、複数回の反復処理や連続する変換処理になります。

Sparkのデータ構造

ApacheSparkのデータ処理には「RDD」と呼ばれるデータ構造を利用する。RDDは複数のマシンから構成されるクラスタ上での分散処理を前提として設計されており、内部的にはパーティションという塊に分割される。RDDパーティションごとに複数のマシンで処理することで分散処理が行われる。

Sparkの分散処理環境

RDDのデータ構造を分散処理するためのクラスタ管理システムには以下のようなものがある。
* YARN
Hadoopクラスタ管理システム。Hadoop分散ファイルシステムであるHDFSで利用するとI/Oが効率化されるらしい。 * Mesos * Spark Standalone Sparkに同梱されているクラスタ管理システム。別途クラスタ管理システムを用意する必要がなく手軽に利用できる。

RDDによるタスク処理

RDDには以下の情報が含まれている
* RDDの元になるデータの情報、または変換前のRDD
* RDD成の元になるデータのロード方法、変換方法
* 変換後のRDDの要素の型
* 変換後のRDDパーティション
* RDDが永続化されているかどうか

RDD内に含まれる上記の情報と遅延評価の性質により効率的なタスクのスケジューリングが可能となっているらしいけど詳しくは別でドキュメントを見たほうがよさそう。

http://kimutansk.hatenablog.com/entry/20130902/1378142510
https://www2.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf

Sparkのマシン構成

ホスト名 役割 説明
spark-client クライアント アプリケーションを起動するクライアント
spark-master マスターノード クラスタ内のリソースを管理
spark-worker00 ワーカーノード 働く

Sparkインストール

とりあえずmacにインストールしてみる
以下のページからsparkをダウンロードして解凍後binにパスを通すととりあえず動きを試すことができる http://spark.apache.org/downloads.html

パスを通した後は以下のコマンドが実行できたらApacheSparkはインストールされている

spark-shell –version

IDEAでの開発

  1. ideaでSBTプロジェクトを作成
    使用するscalaのバージョンは公式ページでapiをサポートしているバージョンを事前に確認しておく(2017/3/20時点ではscala2.12からは利用できなかった)
    http://spark.apache.org/docs/latest/
  2. 実行可能jarが生成できるようにsbt-assemblyをプラグインに追加。追加方法は以下のgithubのページを参考にする。
    https://github.com/sbt/sbt-assembly
    plugins.sbtに以下を追加する
    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")
    org.jboss.interceptorがunresolvedになったらplugins.sbtにさらに以下を追記
    resolvers += "JBoss" at "https://repository.jboss.org"
  3. build.sbtのlibraryDependenciesにspark-coreを追加する。
    バージョンには気をつける
    libraryDependencies ++= Seq(
      "org.apache.spark" % "spark-core_2.11" % "2.1.0"
    )
  4. abt-assemblyのオプションをbuild.sbtに追加
    詳しくは以下のペジを参照。ファイル読み込み時に複数ファイルが見つかった時どうするかのために必要
    https://github.com/sbt/sbt-assembly
    assemblyMergeStrategy in assembly := {
      case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first
      case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first
      case "application.conf"                            => MergeStrategy.concat
      case "unwanted.txt"                                => MergeStrategy.discard
      case x =>
        val oldStrategy = (assemblyMergeStrategy in assembly).value
        oldStrategy(x)
    }
    ついでに出力するjarファイル名も設定しておく
    assemblyJarName in assembly := "something.jar"
  5. 試しにjar出力
    ApacheSparkで実行可能なjarを試しに出力しておく > sbt assembly で多分失敗するので、その場合はlibraryDependenciesをprovidedにしておく。
      "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided"
    それから再度sbt assemlbyを実行して所定のパスにjarが生成されることを確認。ただprovidedの場合有効なスコープがコンパイル時とテスト時になる。今回はIDEAから直接実行して簡単に動きを見るだけなので一旦以下のように変更しておく。
      "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "compile"

簡単なスクリプトをIDEAから動かしてみる

それではSparkを実行するsbtプロジェクトが準備できたので試しにサンプルコードを動かしてみます。 以下のページにある単語の出現数をカウントするscalaのプログラムを動かしてみたいと思います。 http://spark.apache.org/examples.html

import org.apache.spark.{SparkConf, SparkContext}

object spark_shell {

  def main(args:Array[String]): Unit ={

    val conf = new SparkConf().setAppName("WordCount")
    val sc = new SparkContext(conf)

    try{
      val filePath = "build.sbt"
      val wordAndCountRDD = sc.textFile(filePath)
        .flatMap(line => line.split(" "))
        .map(word => (word, 1))
        .reduceByKey(_ + _)

      wordAndCountRDD.collect.foreach(println)
    }
    sc.stop()
  }
}

まあ、単純に読み込んだファイルをスペース区切りで分割してwordごとのmapでカウントアップするだけです。 これで実行してみると以下のエラーが発生すると思います。
- SparkException: A master URL must be set in your configuration

このエラーについてですが、sparkのタスクを実行する場合spark-clientからspark-masterを指定してプログラム実行の流れになるのですが、IDEAでそのまま実行するだけではmaster未指定のため上記エラーが発生すると思われます。本番環境のクラスタリング構成で動かす場合はpark-submitのオプションで指定するのかと思います。 http://spark.apache.org/docs/latest/submitting-applications.html

今回は手軽に動きを試したいのでプログラム中のSparkConfを以下のように変更しmasterとして自分自身を指定して動きを見たいと思います。

val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")

それから実行してみると環境情報等読み込んだ後に以下のような出力結果が得られると思います。

(oldStrategy,1)
(x,1)
(endsWith,1)
("2.11.8",1)
(assembly).value,1)
(scalaVersion,1)
(,97)
(@,2)
("unwanted.txt",1)
(libraryDependencies,1)
("spark-core_2.11",1)
("2.9.7",1)
("joda-time",2)
(PathList(ps,1)
(_*),2)
(assembly,3)