ApacheSparkの基本的なデータ操作

データ操作

Spark2.1で動作確認

build.sbt

IDEAから実行できるようにbuild.sbtに以下を追加。対象のライブラリが存在しない場合はmaven_centralで確認する。

val sparkVersion = "2.1.0"

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.11" % sparkVersion % "compile",
  "org.apache.spark" % "spark-sql_2.11" % sparkVersion % "compile"
)

RDD

実行時の設定の読み込み

val conf = new SparkConf().setAppName("WordCountTop3").setMaster("local[*]")
val sc = new SparkContext(conf)

ハイフン区切りで単語出現数のカウント

val filePath = "build.sbt"
val wordAndCountRDD = sc.textFile(filePath)
  .flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)
wordAndCountRDD.collect.foreach(println)

単語出現数をソートして上位3件を表示

val filePath = "build.sbt"
val wordAndCountRDD = sc.textFile(filePath)
  .flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)

wordAndCountRDD.sortBy( _._2.toInt, ascending = false).take(3).foreach(println)

DataFrame

設定の読み込み

val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(conf)
// spark2からはSparkSessioのほうが推奨だけどエラーで使えなかった
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Spark2.1ではSQLContextではなくSparkSessionの方を推奨しているけど、エラーが発生して利用できなかった。とりあえずSQLContextで試してみる。

RDDでファイルを読み込む

val dessertRDD = sc.textFile("data/dessert-menu.csv")
  .map{ record =>
  val splitRecord = record.split(",")
  val menuId = splitRecord(0)
  val name = splitRecord(1)
  val price = splitRecord(2).toInt
  val kcal = splitRecord(3).toInt
  Dessert(menuId, name, price, kcal)
}

RDDからDataFrameへ変換

// RDD → DataFrame
val dessertDF:DataFrame = sqlContext.createDataFrame(dessertRDD)

スキーマ定義の表示

println("show schema")
println(dessertDF.printSchema)

DataFrameに対してクエリを投げる。生成ずみのDataFrameに対して直接クエリを投げるのではなくワークテーブルを作成して、そちらに対してクエリを実行する。ATAN2などの組み込み関数については公式のドキュメントを確認する。

dessertDF.createOrReplaceTempView("dessert")
val numOver300KcalDF = sqlContext.sql(
  "select count(*) as num_of_over_300Kcal FROM dessert where kcal >= 260"
)
println(numOver300KcalDF.show)
println(sqlContext.sql("select atan2(1, 3) as `ATAN2(1, 3)`").show)

DataFrameをjoinしてクエリを投げる。

val dessertOrderRDD = sc.textFile("data/dessert-order.csv")
  .map{record =>
    val splitRecord = record.split(",")
    val sId = splitRecord(0)
    val menuId = splitRecord(1)
    val num = splitRecord(2).toInt
    DessertOrder(sId, menuId, num)
  }
val dessertOrderDF:DataFrame = sqlContext.createDataFrame(dessertOrderRDD)
dessertOrderDF.createOrReplaceTempView("desert_order")

println(sqlContext.sql(
  """
    |select do.sId
    |  , d.name
    |  , do.num * d.price as amout_per_menu_per_slip
    |from desert d
    |join desert_order do on d.menuId = do.menuId
  """.stripMargin).show