sparkからhiveを利用してみる

spark-shellにてクラスパスを指定する

spark-shell --driver-class-path 対象クラスパス

開発時にちょっと修正後にいちいちビルドしてデプロイして実行するのが面倒なので、インタラクティブシェルにて動作を確認後、ソースに反映の流れにしたい

hive

SQLを実行してみる

パッケージのインポートからselect文実行まで 以下のテーブルを使用するものとする

show create table sample;
+-----------------------------------------------------------------+
                        createtab_stmt                          |
+-----------------------------------------------------------------+
CREATE  TABLE sample(                                           |
  id int)                                                       |
ROW FORMAT SERDE                                                |
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'          |
STORED AS INPUTFORMAT                                           |
  'org.apache.hadoop.mapred.TextInputFormat'                    |
OUTPUTFORMAT                                                    |
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  |
  ...                                                           |
+-----------------------------------------------------------------+
import org.apache.spark._
import org.apache.spark.sql.hive._
val hc = new HiveContext(sc)

val select="select * from sample"
val sqlResult = hc.sql(select)
sqlResult.foreach(row=>println(row))

取得対象の絡むと型を指定する

sqlResult.foreach(row=>println(row.getAs[Int]("id")))

sqlの実行結果をmapで型にセットする

sqlResult.map(row => new IdsBean(row.getAs[Int]("id")))

この時変換先の型がserizableでないとエラーになるので、既存のjava資源でserizableをimplementしていない型にセットする場合は、 scalaの方で利用できるように拡張する必要がある

case class IdsSBean(id: Int) extends dto.IdsBean(id) with java.io.Serializable
val idsRDD = sqlResult.map(row => new IdsSBean(row.getAs[Int]("id")))

RDDから配列に変換する

idsRDD.collect

RDDからSeqに変換する

idsRDD.collect.toSeq

summarizationsパターンを試してみる

簡単な数値の集計を行ってみたいと思います。

まず動作確認に使うデータを登録します。 テーブル作成

create table numerical_input(
  user_id int
  , input int
);

動作確認に使うファイルをcsvファイルに保存してhdfsにアップロード ^Aは制御文字になっておりvimであればCtrl +V Ctrl + Aでファイルに入力できる

# vim numerical_input.txt

12345^A10
12345^A8
12345^A21
54321^A1
54321^A47
54321^A8
88888^A7
88888^A12

# hdfs dfs -put numerical_input.txt /input/

それからテーブルにデータを取り込む

load data inpath '/input/numerical_input.txt' into table numerical_input;
select * from numerical_input;

次にscalaでデータをRDDとして読み込んでみる

scala> hc.sql("select * from numerical_input")
res24: org.apache.spark.sql.DataFrame = [user_id: int, input: int]

scala> val numericalRDD = hc.sql("select * from numerical_input") map { row =>
     | (row.getAs[Int]("user_id"), row.getAs[Int]("input"), 1)
     | }

scala> numericalRDD.show
+-----+---+---+
|   _1| _2| _3|
+-----+---+---+
|12345| 10|  1|
|12345|  8|  1|
|12345| 21|  1|
|54321|  1|  1|
|54321| 47|  1|
|54321|  8|  1|
|88888|  7|  1|
|88888| 12|  1|
+-----+---+---+

Datasetのapiを実行してみる

where

scala> numericalRDD.where($"_1" > 60000).show
+-----+---+---+
|   _1| _2| _3|
+-----+---+---+
|88888|  7|  1|
|88888| 12|  1|
+-----+---+---+

sort

scala> numericalRDD.sort($"_2").show
+-----+---+---+
|   _1| _2| _3|
+-----+---+---+
|54321|  1|  1|
|88888|  7|  1|
|12345|  8|  1|
|54321|  8|  1|
|12345| 10|  1|
|88888| 12|  1|
|12345| 21|  1|
|54321| 47|  1|
+-----+---+---+

scala側でデータが読み込めるようになったのでタプルの一番目にuser_idでグルーピングを行い、タプルに2番目の要素に最大値、3番目に最小値、4番目にカウント結果が入るようにしてみる。

scala> numericalRDD.groupBy($"_1" as "user_group").agg(max($"_2"), min($"_2"), count($"_3")).show
+----------+-------+-------+---------+
|user_group|max(_2)|min(_2)|count(_3)|
+----------+-------+-------+---------+
|     54321|     47|      1|        3|
|     88888|     12|      7|        2|
|     12345|     21|      8|        3|
+----------+-------+-------+---------+