sparkからhiveを利用してみる
spark-shellにてクラスパスを指定する
spark-shell --driver-class-path 対象クラスパス
開発時にちょっと修正後にいちいちビルドしてデプロイして実行するのが面倒なので、インタラクティブシェルにて動作を確認後、ソースに反映の流れにしたい
hive
SQLを実行してみる
パッケージのインポートからselect文実行まで 以下のテーブルを使用するものとする
show create table sample; +-----------------------------------------------------------------+ createtab_stmt | +-----------------------------------------------------------------+ CREATE TABLE sample( | id int) | ROW FORMAT SERDE | 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' | STORED AS INPUTFORMAT | 'org.apache.hadoop.mapred.TextInputFormat' | OUTPUTFORMAT | 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' | ... | +-----------------------------------------------------------------+
import org.apache.spark._ import org.apache.spark.sql.hive._ val hc = new HiveContext(sc) val select="select * from sample" val sqlResult = hc.sql(select) sqlResult.foreach(row=>println(row))
取得対象の絡むと型を指定する
sqlResult.foreach(row=>println(row.getAs[Int]("id")))
sqlの実行結果をmapで型にセットする
sqlResult.map(row => new IdsBean(row.getAs[Int]("id")))
この時変換先の型がserizableでないとエラーになるので、既存のjava資源でserizableをimplementしていない型にセットする場合は、 scalaの方で利用できるように拡張する必要がある
case class IdsSBean(id: Int) extends dto.IdsBean(id) with java.io.Serializable val idsRDD = sqlResult.map(row => new IdsSBean(row.getAs[Int]("id")))
RDDから配列に変換する
idsRDD.collect
RDDからSeqに変換する
idsRDD.collect.toSeq
summarizationsパターンを試してみる
簡単な数値の集計を行ってみたいと思います。
まず動作確認に使うデータを登録します。 テーブル作成
create table numerical_input( user_id int , input int );
動作確認に使うファイルをcsvファイルに保存してhdfsにアップロード ^Aは制御文字になっておりvimであればCtrl +V Ctrl + Aでファイルに入力できる
# vim numerical_input.txt
12345^A10 12345^A8 12345^A21 54321^A1 54321^A47 54321^A8 88888^A7 88888^A12
# hdfs dfs -put numerical_input.txt /input/
それからテーブルにデータを取り込む
load data inpath '/input/numerical_input.txt' into table numerical_input; select * from numerical_input;
scala> hc.sql("select * from numerical_input") res24: org.apache.spark.sql.DataFrame = [user_id: int, input: int] scala> val numericalRDD = hc.sql("select * from numerical_input") map { row => | (row.getAs[Int]("user_id"), row.getAs[Int]("input"), 1) | } scala> numericalRDD.show +-----+---+---+ | _1| _2| _3| +-----+---+---+ |12345| 10| 1| |12345| 8| 1| |12345| 21| 1| |54321| 1| 1| |54321| 47| 1| |54321| 8| 1| |88888| 7| 1| |88888| 12| 1| +-----+---+---+
Datasetのapiを実行してみる
where
scala> numericalRDD.where($"_1" > 60000).show +-----+---+---+ | _1| _2| _3| +-----+---+---+ |88888| 7| 1| |88888| 12| 1| +-----+---+---+
sort
scala> numericalRDD.sort($"_2").show +-----+---+---+ | _1| _2| _3| +-----+---+---+ |54321| 1| 1| |88888| 7| 1| |12345| 8| 1| |54321| 8| 1| |12345| 10| 1| |88888| 12| 1| |12345| 21| 1| |54321| 47| 1| +-----+---+---+
scala側でデータが読み込めるようになったのでタプルの一番目にuser_idでグルーピングを行い、タプルに2番目の要素に最大値、3番目に最小値、4番目にカウント結果が入るようにしてみる。
scala> numericalRDD.groupBy($"_1" as "user_group").agg(max($"_2"), min($"_2"), count($"_3")).show +----------+-------+-------+---------+ |user_group|max(_2)|min(_2)|count(_3)| +----------+-------+-------+---------+ | 54321| 47| 1| 3| | 88888| 12| 7| 2| | 12345| 21| 8| 3| +----------+-------+-------+---------+