ApacheSparkの基本的なデータ操作
データ操作
Spark2.1で動作確認
build.sbt
IDEAから実行できるようにbuild.sbtに以下を追加。対象のライブラリが存在しない場合はmaven_centralで確認する。
val sparkVersion = "2.1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % sparkVersion % "compile", "org.apache.spark" % "spark-sql_2.11" % sparkVersion % "compile" )
RDD
実行時の設定の読み込み
val conf = new SparkConf().setAppName("WordCountTop3").setMaster("local[*]")
val sc = new SparkContext(conf)
ハイフン区切りで単語出現数のカウント
val filePath = "build.sbt"
val wordAndCountRDD = sc.textFile(filePath)
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
wordAndCountRDD.collect.foreach(println)
単語出現数をソートして上位3件を表示
val filePath = "build.sbt"
val wordAndCountRDD = sc.textFile(filePath)
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
wordAndCountRDD.sortBy( _._2.toInt, ascending = false).take(3).foreach(println)
DataFrame
設定の読み込み
val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(conf)
// spark2からはSparkSessioのほうが推奨だけどエラーで使えなかった
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
Spark2.1ではSQLContextではなくSparkSessionの方を推奨しているけど、エラーが発生して利用できなかった。とりあえずSQLContextで試してみる。
RDDでファイルを読み込む
val dessertRDD = sc.textFile("data/dessert-menu.csv")
.map{ record =>
val splitRecord = record.split(",")
val menuId = splitRecord(0)
val name = splitRecord(1)
val price = splitRecord(2).toInt
val kcal = splitRecord(3).toInt
Dessert(menuId, name, price, kcal)
}
RDDからDataFrameへ変換
// RDD → DataFrame val dessertDF:DataFrame = sqlContext.createDataFrame(dessertRDD)
スキーマ定義の表示
println("show schema")
println(dessertDF.printSchema)
DataFrameに対してクエリを投げる。生成ずみのDataFrameに対して直接クエリを投げるのではなくワークテーブルを作成して、そちらに対してクエリを実行する。ATAN2などの組み込み関数については公式のドキュメントを確認する。
dessertDF.createOrReplaceTempView("dessert")
val numOver300KcalDF = sqlContext.sql(
"select count(*) as num_of_over_300Kcal FROM dessert where kcal >= 260"
)
println(numOver300KcalDF.show)
println(sqlContext.sql("select atan2(1, 3) as `ATAN2(1, 3)`").show)
DataFrameをjoinしてクエリを投げる。
val dessertOrderRDD = sc.textFile("data/dessert-order.csv")
.map{record =>
val splitRecord = record.split(",")
val sId = splitRecord(0)
val menuId = splitRecord(1)
val num = splitRecord(2).toInt
DessertOrder(sId, menuId, num)
}
val dessertOrderDF:DataFrame = sqlContext.createDataFrame(dessertOrderRDD)
dessertOrderDF.createOrReplaceTempView("desert_order")
println(sqlContext.sql(
"""
|select do.sId
| , d.name
| , do.num * d.price as amout_per_menu_per_slip
|from desert d
|join desert_order do on d.menuId = do.menuId
""".stripMargin).show