kaggleのTitanic問題をといてみる
kaggleでチュートリアルがわりに使われているTitanicの問題を解いてみて実際に行われている分析の流れを把握できるようにしたいと思います。 kaggleでは個人の解答が公開、議論されているので普段分析をしない人でも学習にはちょうど良さそうな気がします。
まずはデータの読み込み
import pandas as pd from pandas import Series,DataFrame # numpy, matplotlib, seaborn import numpy as np import matplotlib.pyplot as plt import seaborn as sns sns.set_style('whitegrid') %matplotlib inline # machine learning from sklearn.linear_model import LogisticRegression from sklearn.svm import SVC, LinearSVC from sklearn.ensemble import RandomForestClassifier from sklearn.neighbors import KNeighborsClassifier from sklearn.naive_bayes import GaussianNB # get titanic & test csv files as a DataFrame titanic_df = pd.read_csv("train.csv") test_df = pd.read_csv("test.csv") # preview the data
それから読み込んだ情報を確認してみます。
titanic_df.head()
titanic_df.info() <class 'pandas.core.frame.DataFrame'> RangeIndex: 891 entries, 0 to 890 Data columns (total 12 columns): PassengerId 891 non-null int64 Survived 891 non-null int64 Pclass 891 non-null int64 Name 891 non-null object Sex 891 non-null object Age 714 non-null float64 SibSp 891 non-null int64 Parch 891 non-null int64 Ticket 891 non-null object Fare 891 non-null float64 Cabin 204 non-null object Embarked 889 non-null object dtypes: float64(2), int64(5), object(5) memory usage: 83.6+ KB
Titanicの問題では学習データのcsvを読み込んで生き残った人(survived=1)を学習し、テストデータに対して分類を行うものとなっています。csvを読み込んだままの情報だと欠損があったり、不要な項目があったり、そのままでは分析に利用できない項目がありますので、実際に生存に影響のある項目だけが残すようにしていき機械学習して分類となります。
モデルの学習
とりあえず生存に明らかに必要のな誘うな項目は削除します。
titanic_df = titanic_df.drop(['PassengerId','Name','Ticket'], axis=1) test_df = test_df.drop(['Name','Ticket'], axis=1)
欠損が多い項目は削除しておきます。
titanic_df.drop("Cabin",axis=1,inplace=True) test_df.drop("Cabin",axis=1,inplace=True)
Embarkedの項目を処理する
Embarkedの項目を分析で扱いようにします。まず各項目がどんな値を取っているのか確認してみます。
titanic_df[["Embarked", "Survived"]].groupby(['Embarked'],as_index=False).mean()
C,Q,Sの3つの値取ることが確認できました。次に、titanic_df.info()
実行時にEmbarkedは2項目欠損があったのでとりあえずS
で埋めときます。
titanic_df["Embarked"] = titanic_df["Embarked"].fillna("S")
それからEmbarkedの項目が生存に影響しているかグラフ表示してみます。
sns.factorplot('Embarked','Survived', data=titanic_df,size=4,aspect=3)
Sに比べてCとQの方が生存確率は高いかもしれないといった感じなのでしょうか。S,C,Qのぞれぞれの累計値、生存確率も出してみます。
fig, (axis1,axis2,axis3) = plt.subplots(1,3,figsize=(15,5)) sns.countplot(x='Embarked', data=titanic_df, ax=axis1) sns.countplot(x='Survived', hue="Embarked", data=titanic_df, order=[1,0], ax=axis2) embark_perc = titanic_df[["Embarked", "Survived"]].groupby(['Embarked'],as_index=False).mean() sns.barplot(x='Embarked', y='Survived', data=embark_perc,order=['S','C','Q'],ax=axis3)
C,Qかどうかを判断するための項目を分析用にデータフレームに追加します。
embark_dummies_titanic = pd.get_dummies(titanic_df['Embarked']) embark_dummies_titanic.drop(['S'], axis=1, inplace=True) embark_dummies_test = pd.get_dummies(test_df['Embarked']) embark_dummies_test.drop(['S'], axis=1, inplace=True) titanic_df = titanic_df.join(embark_dummies_titanic) test_df = test_df.join(embark_dummies_test) titanic_df.drop(['Embarked'], axis=1,inplace=True) test_df.drop(['Embarked'], axis=1,inplace=True) titanic_df.head()
Fare(運賃)の項目を処理する
欠損は中央値で埋めときます。
test_df["Fare"].fillna(test_df["Fare"].median(), inplace=True)
それからデータがFloat64になっていたのでintに変換しておきます。でグラフ表示します。
titanic_df['Fare'] = titanic_df['Fare'].astype(int) test_df['Fare'] = test_df['Fare'].astype(int) titanic_df['Fare'].plot(kind='hist', figsize=(15,3),bins=100, xlim=(0,50))
生存者の運賃の平均と中央値を確認してみます。
fare_not_survived = titanic_df["Fare"][titanic_df["Survived"] == 0] fare_survived = titanic_df["Fare"][titanic_df["Survived"] == 1] avgerage_fare = DataFrame([fare_not_survived.mean(), fare_survived.mean()]) std_fare = DataFrame([fare_not_survived.std(), fare_survived.std()]) avgerage_fare.index.names = std_fare.index.names = ["Survived"] avgerage_fare.plot(yerr=std_fare,kind='bar',legend=False) std_fare.plot(yerr=std_fare,kind='bar',legend=False)
運賃も生存に影響があったということでデータフレームに残しておきます。
年齢、性別を処理する
まずは年齢のデータを確認する
欠損値はSurvive毎で平均 ± 標準偏差
の範囲の乱数を設定します。
average_age_titanic = titanic_df["Age"].mean() std_age_titanic = titanic_df["Age"].std() count_nan_age_titanic = titanic_df["Age"].isnull().sum() average_age_test = test_df["Age"].mean() std_age_test = test_df["Age"].std() count_nan_age_test = test_df["Age"].isnull().sum() rand_1 = np.random.randint(average_age_titanic - std_age_titanic, average_age_titanic + std_age_titanic, size = count_nan_age_titanic) rand_2 = np.random.randint(average_age_test - std_age_test, average_age_test + std_age_test, size = count_nan_age_test) titanic_df["Age"][np.isnan(titanic_df["Age"])] = rand_1 test_df["Age"][np.isnan(test_df["Age"])] = rand_2
Ageがfloat型だったのでint型に変換します。
titanic_df['Age'] = titanic_df['Age'].astype(int) test_df['Age'] = test_df['Age'].astype(int)
それからグラフ表示します。
fig, (axis1,axis2) = plt.subplots(1,2,figsize=(15,4)) axis1.set_title('Original Age values - Titanic') axis2.set_title('New Age values - Titanic') titanic_df['Age'].dropna().astype(int).hist(bins=70, ax=axis1) titanic_df['Age'].hist(bins=70, ax=axis2)
年齢別の生存率を表示します。
facet = sns.FacetGrid(titanic_df, hue="Survived",aspect=4) facet.map(sns.kdeplot,'Age',shade= True) facet.set(xlim=(0, titanic_df['Age'].max())) facet.add_legend() fig, axis1 = plt.subplots(1,1,figsize=(18,4)) average_age = titanic_df[["Age", "Survived"]].groupby(['Age'],as_index=False).mean() sns.barplot(x='Age', y='Survived', data=average_age)
子供の方が生存率が高いことがわかります。
次に性別も合わせて子供か、成人男性か、成人女性かで分けてSurviveに相関関係があるかみてみます。
def get_person(passenger): age,sex = passenger return 'child' if age < 16 else sex titanic_df['Person'] = titanic_df[['Age','Sex']].apply(get_person,axis=1) test_df['Person'] = test_df[['Age','Sex']].apply(get_person,axis=1) person_dummies_titanic = pd.get_dummies(titanic_df['Person']) person_dummies_titanic.columns = ['Child','Female','Male'] person_dummies_test = pd.get_dummies(test_df['Person']) person_dummies_test.columns = ['Child','Female','Male']
グラフで表示すると子供と女性が助かっていることがわかるのでデータフレームに残します。
fig, (axis1,axis2) = plt.subplots(1,2,figsize=(10,5)) sns.countplot(x='Person', data=titanic_df, ax=axis1) person_perc = titanic_df[["Person", "Survived"]].groupby(['Person'],as_index=False).mean() sns.barplot(x='Person', y='Survived', data=person_perc, ax=axis2, order=['male','female','child'])
person_dummies_titanic.drop(['Male'], axis=1, inplace=True) person_dummies_test.drop(['Male'], axis=1, inplace=True) titanic_df = titanic_df.join(person_dummies_titanic) test_df = test_df.join(person_dummies_test) titanic_df.drop(['Sex'],axis=1,inplace=True) test_df.drop(['Sex'],axis=1,inplace=True) titanic_df.drop(['Person'],axis=1,inplace=True) test_df.drop(['Person'],axis=1,inplace=True)
SibSp,Parchを処理する
SibSp, Parchは同伴者を表しているようで以下のように家族ずれかどうかを判断するように変換します。
titanic_df['Family'] = titanic_df["Parch"] + titanic_df["SibSp"] titanic_df['Family'].loc[titanic_df['Family'] > 0] = 1 titanic_df['Family'].loc[titanic_df['Family'] == 0] = 0 test_df['Family'] = test_df["Parch"] + test_df["SibSp"] test_df['Family'].loc[test_df['Family'] > 0] = 1 test_df['Family'].loc[test_df['Family'] == 0] = 0
Familyの項目に変換したので使わなくなったSibSp, Parchは削除します。
titanic_df = titanic_df.drop(['SibSp','Parch'], axis=1) test_df = test_df.drop(['SibSp','Parch'], axis=1)
それからグラフ表示したら家族ずれの方が生存確率がたかそうなのがわかるので残しておきます。
fig, (axis1,axis2) = plt.subplots(1,2,sharex=True,figsize=(10,5)) # sns.factorplot('Family',data=titanic_df,kind='count',ax=axis1) sns.countplot(x='Family', data=titanic_df, order=[1,0], ax=axis1) # average of survived for those who had/didn't have any family member family_perc = titanic_df[["Family", "Survived"]].groupby(['Family'],as_index=False).mean() sns.barplot(x='Family', y='Survived', data=family_perc, order=[1,0], ax=axis2) axis1.set_xticklabels(["With Family","Alone"], rotation=0)
Pclassを処理する
表示してみる
sns.factorplot('Pclass','Survived',order=[1,2,3], data=titanic_df,size=5) pclass_dummies_titanic = pd.get_dummies(titanic_df['Pclass']) pclass_dummies_titanic.columns = ['Class_1','Class_2','Class_3'] pclass_dummies_test = pd.get_dummies(test_df['Pclass']) pclass_dummies_test.columns = ['Class_1','Class_2','Class_3'] titanic_df.drop(['Pclass'],axis=1,inplace=True) test_df.drop(['Pclass'],axis=1,inplace=True)
pclassは1,2の場合に生存率高いことがわかったのでpclass1,2かどうか判定した結果をデータフレームに付与します。
pclass_dummies_test.drop(['Class_3'], axis=1, inplace=True) pclass_dummies_titanic.drop(['Class_3'], axis=1, inplace=True) titanic_df = titanic_df.join(pclass_dummies_titanic) test_df = test_df.join(pclass_dummies_test)
最終的にモデルはこのようになりました。
titanic_df.head()
機械学習
このように分析に影響のある項目だけが残るようにデータフレームを操作するのですが、最終的には機械学習で分類を行います。scikit-learn
なら簡単に使うことができるので良いと思います。ロジスティック回帰、ランダムフォレストであれば以下のようになります。
# ロジスティック回帰 X_train = titanic_df.drop("Survived",axis=1) Y_train = titanic_df["Survived"] logreg = LogisticRegression() logreg.fit(X_train, Y_train) Y_pred = logreg.predict(X_test) logreg.score(X_train, Y_train) 0.8058361391694725 # ランダムフォレスト random_forest = RandomForestClassifier(n_estimators=100) random_forest.fit(X_train, Y_train) Y_pred = random_forest.predict(X_test) random_forest.score(X_train, Y_train) 0.9640852974186308
今回はランダムフォレストの方がうまく分類できているようです。事前に意味のある情報に分類をしていたのでその場合はランダムフォレストの精度がよくなるのでしょうか。 kaggleのkernelをみたら実際の分析手順を追えるけど欠損の扱いや目的変数に対して影響があるのかを判断して説明できるようになるにはちゃんと勉強した方がよさそうな気がします。
sbt assemlby実行時にリソースフォルダを変更できるようにしたい
まずデフォルトのリソースフォルダを変更してみる
sbt assemblyで実行可能jarを作成する際通常は"src/main/resources"がリソースフォルダとして使われますが、build.sbtにresourceDirectory in Compile
の設定をすることによりデフォルトのリソースフォルダを変更することができます。以下では標準のリソースフォルダがsrc/main/resources/developmentになるように変更しています。
lazy val app = (project in file(".")). settings(assemblySettings: _*). settings(resourceDirectory in Compile := baseDirectory.value / "src" / "main" / "resources" / "development")
sbt assembly実行時に動的にリソースフォルダを変更してみる
例えば以下のようなフォルダの構成でリソースフォルダをビルド時に変更したい場合
src/main/resources ├── development: 開発時に使用するリソース ├── production: 本番環境リリース時のリソース └── staging: ステージ環境リリース時のリソース
この場合、最初はresourceDirectoryを変更するタスクを作ってassemblyと連続して呼び出せば大丈夫なのかと思ったのですがタスク間ではresourceDirectoryの変更などの副作用が起きないような動きをしていて困ったのですが、sbtにはaddCommandAliasというものがあり事前にset resourceDirectory in Compile
すれば大丈夫そうでした。結果ですが以下のbuild.sbtの設定ではsbt assembly
ではデフォルトのリソースフォルダをsbt prodEnvAssembly
ではproduction環境のリソースフォルダをsbt stgEnvAssemlby
ではstaging環境のリソースフォルダを使うようになります。
lazy val app = (project in file(".")). settings(resourceDirectory in Compile := baseDirectory.value / "src" / "main" / "resources" / "development") addCommandAlias("prodEnvAssembly", ";set resourceDirectory in Compile := baseDirectory.value / \"src\" / \"main\" / \"resources\" / \"production\"; assembly") addCommandAlias("stgEnvAssembly", ";set resourceDirectory in Compile := baseDirectory.value / \"src\" / \"main\" / \"resources\" / \"staging\"; assembly")
Sparkで状態を持つobjectを使い回す方法について調べてみた
Sparkでの開発時に既存のjava資源を使用するのはよくあると思うけど、objectが状態を持っていて使い回す必要がある場合も考えられるのでその場合どうすれば良いのか調べてみました。
まず以下のように引数で与えた値だけ内部のカウンターを増やすクラスがあったとする。
class MyCounter extends java.io.Serializable{ private var count = 0 def countUp(up: Int): Unit ={ this.count += up } def getCount = this.count }
このクラスに対して1~10の設定でそれぞれの設定によりカウントアップする数値が決まる使い方を想定する。このクラスに対してログを読み込むたびにカウントアップのメソッドを呼び出したいけど、設定を引数として渡すものになっているので1の設定なら次カウントアップする際も1の設定を維持するといった必要があります。
まず設定とカウンターobjectをRDDにしてmap処理にてカウントアップ後自分自身を返すようにしてみます。この場合は想定通り、map後にカウントアップ後の結果が入っていることが確認できます。
val configRdd = sc.makeRDD(1 to 10) val counterRdd = configRdd.map((new MyCounter, _)) val afterCountup = counterRdd.map{ exeConfig => exeConfig._1.countUp(exeConfig._2) exeConfig } afterCountup.foreach{ exeConfig => println(exeConfig._1.getCount) }
次に一つのcounterオブジェクトをログのRDDのmap処理内でカウントアップしてみる。
これはtask内でobjectが共有されるためcount=1の結果が100個にならない
自分の環境だとmap後の100個のカウンターが0~50の値を取っていた。map処理内でtaskが2個に別れたのか。
val myc = new MyCounter val logRdd = sc.makeRDD(1 to 100) val newc = logRdd.map { log => myc.countUp(1) myc } newc.foreach{row => println(row.getCount) }
foreach内でのカウントアップだが、これだとforeachの中と外では別のobjectのためカウントアップの結果はforeachの外には反映されていない。
val myc = new MyCounter val logRdd = sc.makeRDD(1 to 100) logRdd.foreach { log => myc.countUp(1) } println(myc.getCount)
並列処理とは関係なくなるけど配列のforeach処理内でカウンターobjectをカウントアップするのであれば想定通りカウントアップする。
val myc = new MyCounter val logRdd = sc.makeRDD(1 to 100) logRdd.collect.foreach { log => myc.countUp(1) } println(myc.getCount)
1 ~ 10の設定のobjectに対してlogを読み込んで状態を持つobjectでカウントアップする場合は、カウンターのobjectをRDDにしてそれからmapで配列に変換したログのforeachでカウントアップするようにしたら良さそう。この場合はRDDとして使用するカウンターobjectの数だけ並列に処理することになるのか。
val configRdd = sc.makeRDD(1 to 10) val counterRdd = configRdd.map((new MyCounter, _)) val logRdd = sc.makeRDD(1 to 100) val logRddArray = logRdd.collect val logRddArrayBroadcast = sc.broadcast(logRddArray) val countUp = counterRdd.map{ exeConfig => logRddArrayBroadcast.value.foreach{log=> exeConfig._1.countUp(exeConfig._2) } exeConfig } countUp.foreach { exeConfig => println(exeConfig._1.getCount) }
ApacheSparkで扱うobjectのSerializableの必要性について
ApacheSparkで扱うobjectのSerializableの必要性について
hiveやファイルからデータを読み込んだ直後値はRDD, Dataset, DataFrameになっていて、少ないデータに対して何回もfilter処理を行う必要がある場合に一旦collectして配列に変換しdriver内で処理したい場合もあると思うけど、データを読み込んだ際にnon-serializableなクラスに値をセットしていたらcollectで配列への変換時にエラーが発生したのでその際のメモ
例えば以下のようなjavaのクラスがあったとして、これをsparkで利用するとする
public class IdsBean { private int id; public IdsBean(int id) { /* compiled code */ } public int getId() { /* compiled code */ } public void setId(int id) { /* compiled code */ } }
scala> val rdd = sc.makeRDD(1 to 10) scala> val rddBean = rdd.map(new IdsBean(_))
foreachやmapの処理は行える
scala> rddBean.foreach{ row => println(row.getId) } 6 7 8 9 10 1 2 3 4 5 scala> rddBean.map(row=>row).foreach(row=>println(row.getId)) 1 2 3 4 5 6 7 8 9 10
ただcollectの処理はエラーが発生する
scala> rddBean.collect [Stage 10:> (0 + 0) / 2]17/10/07 13:43:41 ERROR executor.Executor: Exception in task 0.0 in stage 10.0 (TID 20) java.io.NotSerializableException: dto.IdsBean Serialization stack: - object not serializable (class: dto.IdsBean, value: dto.IdsBean@2872403e) - element of array (index: 0) ...
collectの処理ではRDDは各executorに分散されていてそれをdriverに集めるのだが、その際にserialize → deserializeの処理が実行されるのだが、対象のクラスがnon-serializableの場合は シリアライズできないのでcollectの処理を実行する時点でエラーが発生するようだ。
対応策として以下のように対象のクラスを継承してserializableにすることを考えたがそれではダメだった。
scala> class IdsBeanS(id: Int) extends dto.IdsBean(id) with java.io.Serializable
これについてはjava.io.Serializableのjavadocでコメントがあり、non-serializableなクラスのsubtypeをSerilizableにする場合はスーパークラスのpublic, protectedなメンバ変数がシリアライズの対象となるようだ。
To allow subtypes of non-serializable classes to be serialized, the subtype may assume responsibility for saving and restoring the state of the supertype's public, protected, and (if accessible) package fields. The subtype may assume this responsibility only if the class it extends has an accessible no-arg constructor to initialize the class's state. It is an error to declare a class Serializable if this is not the case. The error will be detected at runtime.
また、executor-driver間でobjectを共有する場合はSerializableでになっている必要がわかった。dirver-executor間で値を共有するタイミングだがmapやforeach、collectなどのapiを実行時のようで、シリアライズ、でシリアライズの対象になっているobjectはapiによっても違っているようだ。 例えば先ほどのnon-serializableなRDDについて以下の処理であれば問題ない。
scala> rddBean.foreach{ row => println(row.getId) }
だが、mapやforeachの処理内でnon-sirializableなクラスを使用しようとしたらエラーが発生する。mapやforeachの内部で使うobjectは各executorに送られるのでserializableである必要があるようだ。
scala> val rdd = sc.makeRDD(1 to 10) scala> rdd.foreach{rddRow => | rddBean.foreach(beanRow => println(beanRow.getId)) | } 17/10/07 14:09:03 ERROR executor.Executor: Exception in task 0.0 in stage 13.0 (TID 26) org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases: (1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
いつexecutor間にobjectが送られるのかは知っていた方が良いけど、基本的にはSerializableなobjectを使うのが安全そう。 あと同一のexecutorにobjectが送られたとしてもtask内でシリアライズ、デシリアライズして使用するからtask間で同じobjectを参照するということはないらしい。 (基本的なことだけど、object自体が状態を持っていてスレッドセーフでなければ問題が発生するかもしれない) https://twitter.com/maropu/status/889747740858568704
Scalaでseqを操作してみる
scalaでSeqを操作してみる
まず以下のcase classがあったとし、
case class Element(id: Int, time: java.sql.Timestamp)
初期のデータとして以下を保持する
val elementSeq = Array( Element(1, new java.sql.Timestamp(new DateTime(2017, 8, 10, 16, 13).getMillis)) , Element(2, new java.sql.Timestamp(new DateTime(2017, 8, 9, 11, 5).getMillis)) , Element(3, new java.sql.Timestamp(new DateTime(2017, 5, 22, 9, 13).getMillis)) , Element(4, new java.sql.Timestamp(new DateTime(2017, 9, 1, 22, 13).getMillis)) , Element(5, new java.sql.Timestamp(new DateTime(2017, 7, 31, 23, 13).getMillis)) , Element(6, new java.sql.Timestamp(new DateTime(2017, 8, 15, 12, 7).getMillis)) ).toSeq
ソート
Timestampの降順でソートするのは以下のようになる。TimestampはComparableをimplementsしていないので、ソートする際はgetTimeでミリ秒に変換などする
val sortedArray = elementSeq.sortBy(-_.time.getTime) sortedArray.foreach(println)
unionでseqを結合する。
val elementSeq2 = Array( Element(1, new java.sql.Timestamp(new DateTime(2017, 8, 12, 15, 11).getMillis)) , Element(2, new java.sql.Timestamp(new DateTime(2017, 8, 7, 6, 5).getMillis)) , Element(3, new java.sql.Timestamp(new DateTime(2017, 5, 9, 16, 13).getMillis)) , Element(4, new java.sql.Timestamp(new DateTime(2017, 9, 1, 23, 59).getMillis)) , Element(5, new java.sql.Timestamp(new DateTime(2017, 7, 31, 23, 12).getMillis)) , Element(6, new java.sql.Timestamp(new DateTime(2017, 8, 14, 23, 4).getMillis)) ).toSeq val unionTimeSeq = elementSeq.map(_.time) .union(elementSeq2.map(_.time)) .sortBy(_.getTime) unionTimeSeq.foreach(println)
要素の追加とフィルター
次にstartDateで指定した日付以降でフィルターして昇順でソートするのは以下のようになる。
val startDate = new java.sql.Timestamp(new DateTime(2017, 6, 1, 0, 0).getMillis) val timeArray = (startDate +: unionTimeSeq) .filter(_.getTime >= startDate.getTime) .sortBy(_.getTime) timeArray.foreach(println)
指定日の直前のレコードの取り出し
日付のSeqでmapし、事前に日付の降順でソートされたSeqから指定日以前の一番新しいレコードを取り出す処理は以下のようになる。
val validRecordSeq = timeArray.map{time=> val record = sortedArray .filter(_.time.getTime <= time.getTime) .headOption.getOrElse(Element(0, new java.sql.Timestamp(new DateTime(2017, 6, 1, 0, 0).getMillis))) (time, record) } validRecordSeq.foreach(println)
sparkからhiveを利用してみる
spark-shellにてクラスパスを指定する
spark-shell --driver-class-path 対象クラスパス
開発時にちょっと修正後にいちいちビルドしてデプロイして実行するのが面倒なので、インタラクティブシェルにて動作を確認後、ソースに反映の流れにしたい
hive
SQLを実行してみる
パッケージのインポートからselect文実行まで 以下のテーブルを使用するものとする
show create table sample; +-----------------------------------------------------------------+ createtab_stmt | +-----------------------------------------------------------------+ CREATE TABLE sample( | id int) | ROW FORMAT SERDE | 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' | STORED AS INPUTFORMAT | 'org.apache.hadoop.mapred.TextInputFormat' | OUTPUTFORMAT | 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' | ... | +-----------------------------------------------------------------+
import org.apache.spark._ import org.apache.spark.sql.hive._ val hc = new HiveContext(sc) val select="select * from sample" val sqlResult = hc.sql(select) sqlResult.foreach(row=>println(row))
取得対象の絡むと型を指定する
sqlResult.foreach(row=>println(row.getAs[Int]("id")))
sqlの実行結果をmapで型にセットする
sqlResult.map(row => new IdsBean(row.getAs[Int]("id")))
この時変換先の型がserizableでないとエラーになるので、既存のjava資源でserizableをimplementしていない型にセットする場合は、 scalaの方で利用できるように拡張する必要がある
case class IdsSBean(id: Int) extends dto.IdsBean(id) with java.io.Serializable val idsRDD = sqlResult.map(row => new IdsSBean(row.getAs[Int]("id")))
RDDから配列に変換する
idsRDD.collect
RDDからSeqに変換する
idsRDD.collect.toSeq
summarizationsパターンを試してみる
簡単な数値の集計を行ってみたいと思います。
まず動作確認に使うデータを登録します。 テーブル作成
create table numerical_input( user_id int , input int );
動作確認に使うファイルをcsvファイルに保存してhdfsにアップロード ^Aは制御文字になっておりvimであればCtrl +V Ctrl + Aでファイルに入力できる
# vim numerical_input.txt
12345^A10 12345^A8 12345^A21 54321^A1 54321^A47 54321^A8 88888^A7 88888^A12
# hdfs dfs -put numerical_input.txt /input/
それからテーブルにデータを取り込む
load data inpath '/input/numerical_input.txt' into table numerical_input; select * from numerical_input;
scala> hc.sql("select * from numerical_input") res24: org.apache.spark.sql.DataFrame = [user_id: int, input: int] scala> val numericalRDD = hc.sql("select * from numerical_input") map { row => | (row.getAs[Int]("user_id"), row.getAs[Int]("input"), 1) | } scala> numericalRDD.show +-----+---+---+ | _1| _2| _3| +-----+---+---+ |12345| 10| 1| |12345| 8| 1| |12345| 21| 1| |54321| 1| 1| |54321| 47| 1| |54321| 8| 1| |88888| 7| 1| |88888| 12| 1| +-----+---+---+
Datasetのapiを実行してみる
where
scala> numericalRDD.where($"_1" > 60000).show +-----+---+---+ | _1| _2| _3| +-----+---+---+ |88888| 7| 1| |88888| 12| 1| +-----+---+---+
sort
scala> numericalRDD.sort($"_2").show +-----+---+---+ | _1| _2| _3| +-----+---+---+ |54321| 1| 1| |88888| 7| 1| |12345| 8| 1| |54321| 8| 1| |12345| 10| 1| |88888| 12| 1| |12345| 21| 1| |54321| 47| 1| +-----+---+---+
scala側でデータが読み込めるようになったのでタプルの一番目にuser_idでグルーピングを行い、タプルに2番目の要素に最大値、3番目に最小値、4番目にカウント結果が入るようにしてみる。
scala> numericalRDD.groupBy($"_1" as "user_group").agg(max($"_2"), min($"_2"), count($"_3")).show +----------+-------+-------+---------+ |user_group|max(_2)|min(_2)|count(_3)| +----------+-------+-------+---------+ | 54321| 47| 1| 3| | 88888| 12| 7| 2| | 12345| 21| 8| 3| +----------+-------+-------+---------+
Hiveの環境構築
Hive環境構築
インストール
1.javaのインストール
7系のjavaをインストールしてパスを通しておきます。
export JAVA_HOME=/usr/local/jdk1.7.0_71 export PATH=$PATH:$JAVA_HOME/bin
2.Hadoopのインストール
hadoop version
パスの設定
export HADOOP_HOME=/usr/local/hadoop export HADOOP_MAPRED_HOME=$HADOOP_HOME export HADOOP_COMMON_HOME=$HADOOP_HOME export HADOOP_HDFS_HOME=$HADOOP_HOME export YARN_HOME=$HADOOP_HOME export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin
core-site.xmlにNameNodeの情報を設定する
<configuration> <property> <name>fs.default.name</name> <value>hdfs://localhost:9000</value> </property> </configuration>
hdfs-site.xmlを編集してNameNode,DataNodeのデータ保存先を設定する
<configuration> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>dfs.namenode.name.dir</name> <value>file:///home/hadoop/hadoopinfra/hdfs/namenode</value> </property> <property> <name>dfs.namenode.data.dir</name> <value>file:///home/hadoop/hadoopinfra/hdfs/datanode</value > </property> </configuration>
yarn-site.xmlを編集yarn.nodemanager.aux-servicesにmapreduce_shuffleを設定する
<configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> </configuration>
map-red-site.xmlを変数しmapreduce.framework.nameにyarnを設定する
<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
NameNodeをフォーマットする
hdfs namenode -format
3.Hiveのインストール
Hiveをダウンロードする。インストール済みのhadoopにあっているバージョンを選びます。 https://hive.apache.org/downloads.html ダウンロード後に解凍します。それから、パスを通します。
export HIVE_HOME=/opt/ export PATH=$PATH:$HIVE_HOME/bin export CLASSPATH=$CLASSPATH:$HADOOP_HOME/lib/*:. export CLASSPATH=$CLASSPATH:$HIVE_HOME/lib/*:.
hive-env.shを有効にする
cd $HIVE_HOME/conf cp hive-env.sh.template hive-env.sh
hiveのメタ情報保存先の設定
今回はPostgreSQLにhiveのめた情報を保存するようにします。 postgresqlをインストールします。hiveインストール環境からアクセスできるようにしておきます。
yum install postgresql-server postgresql-setup initdb systemctl start postgresql systemctl enable postgresql
ドライバをhiveのlibに移動します。
wget https://jdbc.postgresql.org/download/postgresql-9.3-1103.jdbc4.jar mv postgresql-9.3-1103.jdbc4.jar /opt/hive-0.12.0/lib/
PostgreSQLにHiveで使うユーザとDBを作成します。
createuser -P hive createdb -O hive hive
メタ情報保存に使スキーマを実行します /opt/hive-0.12.0/scripts/metastore/upgrade/postgres/hive-schema-0.12.0.postgres.sql
hive-site.xmlを編集する
<property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:postgresql://ポスグレインストール先:5432/hive</value> <description>JDBC connect string for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>org.postgresql.Driver</value> <description></description> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>hive</value> <description>username to use against metastore database</description> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>hive</value> <description>password to use against metastore database</description> </property>
動作確認
# hive hive> show tables; OK Time taken: 3.323 seconds
Hiveserver2を起動してbeelineで接続してみる
hive-site.xmlを変数する 今回は動作確認のため認証を行わなくても接続できるようにする。
<property> <name>hive.server2.authentication</name> <value>NOSASL</value> <!-- default NONE is for SASLTransport --> </property> <property> <name>hive.server2.enable.doAs</name> <value>false</value> <!-- Execute query as hiveserver2 process user --> </property>
hiveserver2を起動する
$HIVE_HOME/bin/hiveserver2 &
beelineで接続してみる
# beeline !connect jdbc:hive2://localhost:10000/default;auth=noSasl hive org.apache.hive.jdbc.HiveDriver
※接続先のポートはhive-site.xmlのhive.server2.thrift.portを確認する
hiveqlを実行してみる
データ登録の確認のためまず以下のようなテキストファイルを作成しhdfsに上げておく
1 2 3 4 5
それからbeelineで接続し以下を実行する
create table sample ( id INT ); load data inpath 'ファイルパス' into table sample; select * from sample;
インサートしたレコードはhdfs上にあるのが以下のコマンドで確認できる
hdfs dfs -ls hive-site.xmlでhive.metastore.warehouse.dirに指定しているパス/DB名
またメタ情報がPostgreSQLに保存されていることも確認できる、例えばテーブル名の情報が保存される
hive=# \d "TBLS" TBL_ID | bigint | not null CREATE_TIME | bigint | not null DB_ID | bigint | LAST_ACCESS_TIME | bigint | not null OWNER | character varying(767) | default NULL::character varying RETENTION | bigint | not null SD_ID | bigint | TBL_NAME | character varying(128) | default NULL::character varying TBL_TYPE | character varying(128) | default NULL::character varying VIEW_EXPANDED_TEXT | text | VIEW_ORIGINAL_TEXT | text | hive=# select * from "TBLS"; 1 | 1506262292 | 1 | 0 | root | 0 | 1 | sample | MANAGED_TABLE | |
Apache SparkからHiveを利用する
spark-shellでインタラクティブシェルから実行してみる
spark-shellコマンドを実行することでインタラクティブにsparkを実行することができます。
# spark-shell
spark-shell実行時に以下のようなエラーが出た場合は、
org.apache.hadoop.hive.metastore.api.MetaException: Hive Schema version 1.2.0 does not match metastore's schema version 0.12.0 Metastore is not upgraded or corrupt
hive-site.xmlのhive.metastore.schema.verificationにfalseを指定することでうまくいくようになるかもしれないです。
<property> <name>hive.metastore.schema.verification</name> <value>false</value> <description> </description> </property>
spark-shellではscにSparkContextがセットされている
scala> sc res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@519e67e
import org.apache.spark._ import org.apache.spark.sql.hive._ val hc = new HiveContext(sc) val select="select * from sample" val sqlResult = hc.sql(select) sqlResult.foreach(row=>println(row))
次に必要なクラスのインポート後hiveのコンテキストを初期化してみます。
scala> import org.apache.spark._ import org.apache.spark._ scala> import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive._ scala> val hc = new HiveContext(sc) warning: there was one deprecation warning; re-run with -deprecation for details hc: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@8167f57
試しにselectを実行してみます。
scala> val select="select * from sample" select: String = select * from sample scala> val sqlResult = hc.sql(select) sqlResult: org.apache.spark.sql.DataFrame = [id: int] scala> sqlResult.foreach(row=>println(row)) [Stage 0:> (0 + 2) / 2] [1] ...
pysparkから実行してみる
次にpysparkからpythonスクリプトでhiveを利用してみたいと思います。
pyspark
pysparkでもspark-shellと同様にscにSparkContenxtがセットされています。
>>> sc <pyspark.context.SparkContext object at 0x11029d0>
HiveContextをインポートしてコンテキストを初期化します。
>>> from pyspark.sql import HiveContext >>> sqlContext = HiveContext(sc)
それからSQLを実行してみます。
>>> sqlContext.sql("select * from sample").show() +---+ | id| +---+ | 1| | 2| | 3| | 4| | 5| +---+
pysparkからでもhiveに接続してデータを取ってこれることが確認できました。