ApacheSparkの基本的なデータ操作
データ操作
Spark2.1で動作確認
build.sbt
IDEAから実行できるようにbuild.sbtに以下を追加。対象のライブラリが存在しない場合はmaven_centralで確認する。
val sparkVersion = "2.1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % sparkVersion % "compile", "org.apache.spark" % "spark-sql_2.11" % sparkVersion % "compile" )
RDD
実行時の設定の読み込み
val conf = new SparkConf().setAppName("WordCountTop3").setMaster("local[*]") val sc = new SparkContext(conf)
ハイフン区切りで単語出現数のカウント
val filePath = "build.sbt" val wordAndCountRDD = sc.textFile(filePath) .flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) wordAndCountRDD.collect.foreach(println)
単語出現数をソートして上位3件を表示
val filePath = "build.sbt" val wordAndCountRDD = sc.textFile(filePath) .flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) wordAndCountRDD.sortBy( _._2.toInt, ascending = false).take(3).foreach(println)
DataFrame
設定の読み込み
val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]") val sc = new SparkContext(conf) // spark2からはSparkSessioのほうが推奨だけどエラーで使えなかった val sqlContext = new org.apache.spark.sql.SQLContext(sc)
Spark2.1ではSQLContextではなくSparkSessionの方を推奨しているけど、エラーが発生して利用できなかった。とりあえずSQLContextで試してみる。
RDDでファイルを読み込む
val dessertRDD = sc.textFile("data/dessert-menu.csv") .map{ record => val splitRecord = record.split(",") val menuId = splitRecord(0) val name = splitRecord(1) val price = splitRecord(2).toInt val kcal = splitRecord(3).toInt Dessert(menuId, name, price, kcal) }
RDDからDataFrameへ変換
// RDD → DataFrame val dessertDF:DataFrame = sqlContext.createDataFrame(dessertRDD)
スキーマ定義の表示
println("show schema") println(dessertDF.printSchema)
DataFrameに対してクエリを投げる。生成ずみのDataFrameに対して直接クエリを投げるのではなくワークテーブルを作成して、そちらに対してクエリを実行する。ATAN2などの組み込み関数については公式のドキュメントを確認する。
dessertDF.createOrReplaceTempView("dessert") val numOver300KcalDF = sqlContext.sql( "select count(*) as num_of_over_300Kcal FROM dessert where kcal >= 260" ) println(numOver300KcalDF.show) println(sqlContext.sql("select atan2(1, 3) as `ATAN2(1, 3)`").show)
DataFrameをjoinしてクエリを投げる。
val dessertOrderRDD = sc.textFile("data/dessert-order.csv") .map{record => val splitRecord = record.split(",") val sId = splitRecord(0) val menuId = splitRecord(1) val num = splitRecord(2).toInt DessertOrder(sId, menuId, num) } val dessertOrderDF:DataFrame = sqlContext.createDataFrame(dessertOrderRDD) dessertOrderDF.createOrReplaceTempView("desert_order") println(sqlContext.sql( """ |select do.sId | , d.name | , do.num * d.price as amout_per_menu_per_slip |from desert d |join desert_order do on d.menuId = do.menuId """.stripMargin).show