Haskell学習中のメモ

関数を定義する

関数名 :: 引数の型、 -> 戻り値の型 とすることで指定した型の引数を受け取り戻り値の型を返す関数を定義できる。以下のように指定すると7を受け取った場合は"LUCKY NUMBER SEVEN!"を返し、それ以外の場合は"Sorry, youre out of luck, pal!"を返すようになる。引数に対して何を返すかは定義された引数の順番でパターンマッチして最初に見つかったものを返すようにしている。そのためlucky x が先に定義されているのであれば毎回"Sorry, youre out of luck, pal!"を返すようになる。

lucky :: Int -> String
lucky 7 = "LUCKY NUMBER SEVEN!"
lucky x = "Sorry, youre out of luck, pal!"

高階関数

Haskellでは関数を引数として受け取ったり、戻り値として関数を返すことができて、このような関数を高階関数と呼びます。このようなことができるのはHaskellの関数にカリー化の機能が備わっており、全ての関数は引数を一つ受け取り引数が一つ指定された関数または戻り値を返すものと見なすことが出来る。 例えば以下のような関数があったとする。

multiThree :: Int -> Int -> Int -> Int
multiThree x y z = x * y * z

この関数を利用して引数2つを受けとる関数を以下のように定義することが出来る。

multiTwoNine :: Int -> Int -> Int
multiTwoNine = multiThree 9

multiTwoNine = multiThree 9の部分で2つの引数と9を掛け合わせる関数をmultiTwoNineに代入している。

関数を引数として受け取る場合は以下のように定義できる。 applyTwice :: (a -> a) -> a -> aでは'引数を一つ受け取り同じ型を戻りとして返す関数'と'引数に使う関数の引数の型'を受け取り'引数に使う関数の戻り値の型'を返す関数として扱われる。以下ではmultiTwoNine 5で生成した関数をapplyTwiceの第一引数として利用している。applyTwice f x = f ( f x )の部分は関数合成でapplyTwiceに渡した関数を2回連続で実行(2度目の実行は1度目の結果を引数として受け取る)するようになる。

applyTwice :: (a -> a) -> a -> a
applyTwice f x = (f . f ) x
print $ applyTwice (multiTwoNine 5) 10

ラムダ式

ラムダ式による無名関数を利用することもできる。例えば引数2つと9を掛け合わせるラムダ式であれば、以下のようになる。下の(\x y -> x * y * 9)の部分がラムダ式になる。

print $ (\x y -> x * y * 9) 5 4

map

リストに関しては関数の逐次処理をmapで行うことが出来る。

print $ map (applyTwice (multiTwoNine 5)) [10, 11, 12, 13]

型、型クラス

Haskellの型システムには以下のような機能がある
- 型チェック
 引数の型と戻り値の型から式が出し以下を静的にチェックする
- 多相性
 再利用性を高めるためhaskellの型には多層
 以下のように汎用の型t, t1を使っている(パラメータ多層)場合は、実行時に具体的な型が決まる動きをする
 1 == 1"OCaml" == "OCaml"のように同じ関数名で別の実装を定義できるアドホック多層がある(オブジェクト指向オーバーロードに相当する)
- 型推論
型が不明なσ機に対してなるべく汎用的な型を割り当てるようにする
 

型コンストラクタと型引数

型の記述に使われる識別子を型コンストラクタという。Maybeなど引数を受け取る型コンストラクタについて、Intを与えてMaybe IntとするときこのIntのような型コンストラクタに与える引数を型引数と呼ぶ。型コンストラクタが型引数が必要かどうかはGHCiで:k 型コンストラクタを実行することで確認できるMaybeの場合はMaybe :: * -> *になり、これは任意の型が型引数として必要なことを表している。Intの場合はInt :: *でこれは型引数が必要でないことを表している。この:kの実行結果をカインドという。

型変数

idやheadのように型が異なっていても同じ実装を使いまわせる関数は型変数を用いて実装されている。:tで確認するとそれぞれ以下のようになる。 id :: a -> a, head :: [a] -> aここではa, [a]が型変数で使われており、通常は小文字1文字が使われることが多い。

型制約

特定の型クラスに所属する型のみ引数として受け取れるように制限でき、これを型制約と言います。例えばshow関数の型制約は以下のようになっています。
show :: Show a => a -> String
これはShow型クラスに属しているクラスのみがshowメソッドの引数として使用できるという制約になる。 自作の型クラスをShow型クラスに所属させる場合は以下のようにderivingで指定すれば良い。
data MyData = I Int deriving (Show)

代数的データ型を定義する

代数データ型により型同士を組み合わせて新しい型を作ることが出来る。代数データ型の定義にはデータコンストラクタを使用する。例えばInt型、Bool型、String型の3つのフィールドをもちデータコンストラクタにNewEmployeeを使用するEmployee型は以下のように定義できる。
data Employee = NewEmployee Integer Bool String
以下のように定義することもできる。
data Employee = NewEmployee {age :: Integer, isManager :: Bool, name :: String}
インスタンスの生成及びデータの取り出しは以下のように行える。

ghci>employee = NewEmployee 30 False "abc"
ghci>NewEmployee age isManager name = employee
ghci>name
"abc"

複数のデータコンストラクタを持つデータ型の定義

データコンストラクタが複数ある場合は |つなぎで定義できる。
data CmdOption = COptInt Integer | COptBool Bool | COptStr String

データの正格性フラグ

代数データ型ではデータコンストラクタの引数がデフォルトで非正格となっている。非性格の引数については値が入っていなくてもコンストラクタの結果を評価できるが、正格な引数がundefinedの場合はデータコンストラクタの結果を利用することができない。

data LazyAndStrict = LazyAndStrict { lsLazy :: Int, lsStrict :: !Int }

ここではlsStrictが非正格なので以下のようにlsStrictがundefinedの場合はlsLazyを取得するのにも失敗する。

ghci>lsStrict $ LazyAndStrict undefined 2
2
ghci>lsLazy $ LazyAndStrict 1 undefined
*** Exception: Prelude.undefined
CallStack (from HasCallStack):
  error, called at libraries/base/GHC/Err.hs:79:14 in base:GHC.Err
  undefined, called at <interactive>:143:26 in interactive:Ghci44

フィールドの値の差し替え

Haskellの変数は全てイミュータブルであり生成済みのデータ型に対して値を更新するということができず、フィールドの値を差し替えたい場合はコピーして生成することになります。 例えば生成済みのEmployee型のデータに対して年齢を一つ増やしたデータを生成する場合は以下のようなことが出来る。

employee' :: Employee -> Employee
employee' employee = employee {employeeIsManager = True
, employeeAge = employeeAge employee + 1 }

型シノニム

typeキーワードを使うことで型に別名をつけることが出来る。これを型シノニムという。Integer型にAgeという別名をつける場合は以下のようになる。 type Age = Integer

newtype

型シノニムと似た概念としてnewtypeがある。型シノニムとの違いとして、型シノニムは既存のクラスの別名として使用する(Integerの型シノニムとしてAgeを定義したらAge型の値はIntegerが引数の関数に利用できる。)がnewtypeは別のクラスとして定義される。またコンストラクタ名が必要になるという違いもある。

ghci>newtype NTIndexed a = NewNTIndexed (Integer, a) deriving Show
ghci>x = NewNTIndexed (11, "eleven")
ghci>:type x
x :: NTIndexed [Char]
ghci>x
NewNTIndexed (11,"eleven")

型の別名だけを外部に公開すると、内部の実装を隠蔽し実装の修正が行いやすくなる。

型クラス

javaでいうインターフェースのようなもので、例えばserializableを実装したクラスから生成したインスタンスシリアライズ可能なことを表すように、Show型クラスに属するデータ型の値はshowメソッドの引数として利用できる。型クラスは以下のように定義できる。

class 型クラス名 関数名 where
  関数名1 :: 型名1
  関数名1のデフォルト実装
  関数名2 :: 型名2
  ...

型クラスの定義では各関数でのデフォルト実装を定義しますが、データの型毎での関数を定義したい場合はインスタンスの定義を行います。以下のようなフォーマットになります。

instance 型クラス名 型名 ( インスタンス名 ) where
  関数名 = 式

例えば以下のような代数データ型を定義しておいて

data Human = Human String deriving (Show, Read, Eq)
data Dog = Dog deriving (Show, Read, Eq)
data Cat = Cat deriving (Show, Read, Eq)

型クラスとして以下を定義する。

class Greeting a where
  name  :: a -> String
  hello :: a -> String
  hello _ = "..." -- hello関数のデフォルトの実装
  bye   :: a -> String
  bye   _ = "..." -- bye関数のデフォルトの実装

instance Greeting Human where
  name  (Human n) = n
  hello h         = "Hi, I'm " ++ name h ++ "."
  bye   _         = "See you."

instance Greeting Dog where
  name _          = "a dog"
  hello _         = "Bark!"

instance Greeting Cat where
  name _          = "a cat"
  bye  _          = "meow"

これでHumanデータ型はinstanceで定義されたメソッドが呼び出されDogデータ型ではhello関数がinstance定義でbyeは型クラスでのデフォルトが適用される。

ファンクター型クラス

ファンクター、アプリカティブファンクター、モナドはややこしいですが有効に使えれば便利な機能になるのかと思います。まずはファンクター型クラスを確認していきたいと思います。実装は以下のようになっています。

ghci>:i Functor
class Functor (f :: * -> *) where
  fmap :: (a -> b) -> f a -> f b
  (<$) :: a -> f b -> f a
  {-# MINIMAL fmap #-}
    -- Defined in ‘GHC.Base’
instance Functor (Either a) -- Defined in ‘Data.Either’
instance Functor [] -- Defined in ‘GHC.Base’
instance Functor Maybe -- Defined in ‘GHC.Base’
instance Functor IO -- Defined in ‘GHC.Base’
instance Functor ((->) r) -- Defined in ‘GHC.Base’
instance Functor ((,) a) -- Defined in ‘GHC.Base’

クラス定義よりFunctor型クラスはfmap、(<$)の2つの関数が定義されていることが条件になることがわかります。fmapの定義を直接見にいっても同様に以下のようになっています。

ghci>:t fmap
fmap :: Functor f => (a -> b) -> f a -> f b

これより、fmapの定義はa型を引数に受け取りb型を返す関数f aの戻り値の型を引数としてf bの結果を返すように見えましたが、関数a -> bをとって関数f a -> 関数 f bを返すようでこの操作を関数の持ち上げ(liftup)というようです。 f aの値としてはJust 1などが使えるのですが、Justの定義を見るとMaybeを返すことがわかり

ghci>:t Just
Just :: a -> Maybe a

Maybeの定義を確認すると以下のようにずらずらと表示されFunctorの型クラスに所属していることがわかります。

ghci>:i Maybe
data Maybe a = Nothing | Just a     -- Defined in ‘GHC.Base’
instance Eq a => Eq (Maybe a) -- Defined in ‘GHC.Base’
instance Monad Maybe -- Defined in ‘GHC.Base’
instance Functor Maybe -- Defined in ‘GHC.Base’
instance Ord a => Ord (Maybe a) -- Defined in ‘GHC.Base’
instance Read a => Read (Maybe a) -- Defined in ‘GHC.Read’
instance Show a => Show (Maybe a) -- Defined in ‘GHC.Show’
instance Applicative Maybe -- Defined in ‘GHC.Base’
instance Foldable Maybe -- Defined in ‘Data.Foldable’
instance Traversable Maybe -- Defined in ‘Data.Traversable’
instance Monoid a => Monoid (Maybe a) -- Defined in ‘GHC.Base’

fmap (*5) Just 3を実行するとJust 15を返しますが、これは関数Justに渡す引数に対してfmapの第一引数として渡している(*5)を適用してその結果をJustに渡している動きになります。

ghci>:t fmap (*5)
fmap (*5) :: (Num b, Functor f) => f b -> f b

fmap (*5)の定義を見るとファンクター値を渡したらファンクター値を返すことがわかります。

リストを利用する場合は、fmap (*5) [1,2,3,4,5]のようにするとリストの中の各値に対して*5を適用した結果のリストが得られます。

アプリカティブファンクター

次にアプリカティブファンクターを確認していきます。定義を確認すると以下のようになりApplicative型クラスに属する場合はFunctorに属させる必要があることがわかります。

class Functor f => Applicative (f :: * -> *) where
  pure :: a -> f a
  (<*>) :: f (a -> b) -> f a -> f b
  (*>) :: f a -> f b -> f b
  (<*) :: f a -> f b -> f a
  {-# MINIMAL pure, (<*>) #-}
    -- Defined in ‘GHC.Base’
instance Applicative (Either e) -- Defined in ‘Data.Either’
instance Applicative [] -- Defined in ‘GHC.Base’
instance Applicative Maybe -- Defined in ‘GHC.Base’
instance Applicative IO -- Defined in ‘GHC.Base’
instance Applicative ((->) a) -- Defined in ‘GHC.Base’
instance Monoid a => Applicative ((,) a) -- Defined in ‘GHC.Base’

pureは値を引数にとり、その値を包んだアプリカティブ値を返します。fmap (*5)はアプリカティブ値を引数に取るためfmap (*5) 3は実行できませんがfmap (*5) (pure 3)は実行することができます。 次に(<*>)ですが定義はfmapに似ていることがわかります。引数が(a -> b)からf (a -> b)に変わっているという違いがありまして、fmapは関数のファンクター値の中の値に適用してくれるのに対して、<*>は関数の入っているファンクター値と値の入っているファンクター値を引数にとって、1つ目のファンクターの中身である関数を2つ目のファンクターの中身に適用します。

ghci>Just (*3) <*> Just 12
Just 36

ではJust (*3)を関数の入っているファンクター値、Just 12を値の入っているファンクター値として<*>に渡し、Just 36を1つ目のファンクターの中身である関数を2つ目のファンクターの中身に適用した結果として返します。<*>はファンクターの中に入った関数適用の結果をファンクターの外に取り出して関数適用することができるので、以下のように連続できの適用も行うことができます。
pure (+) <*> Just 2 <*> Just 3
また<<$>を使うと<*>の第一引数として関数の入っているファンクター値ではなく関数を渡せるようになります。先ほどの例でしたら以下のように修正できます。
(+) <$> Just 2 <*> Just 3

リスト同士で適用する場合は、各要素に対して第一引数の関数の入っているファンクター値の中身である関数を第2引数の各要素に適用します。 例えば[(*0), (+100), (*10)] <*> [1, 2, 3]を実行すると以下のようになります。

ghci>[(*0), (+100), (*10)] <*> [1, 2, 3]
[0,0,0,101,102,103,10,20,30]

モナド

次にモナドの型クラスです。モナドはファンクター、アプリカティブファンクターの強化版であり機能を兼ねそろえています。(ファンクターでは文脈付きの値を保持でき、アプリカティブファンクターでは文脈を持ったまま中の値に関数を適用できるようになった)モナドではそれらに加え普通の値aをとって文脈付きの値を返す関数に、文脈付きの値m aを渡せるようにするという機能があります。ここでのmはファンクターのことでf aの代わりにモナドであることをわかりやすくするようにm aとしています。Monad型クラスの実装を確認して見ます。

ghci>:i Monad
class Applicative m => Monad (m :: * -> *) where
  (>>=) :: m a -> (a -> m b) -> m b
  (>>) :: m a -> m b -> m b
  return :: a -> m a
  fail :: String -> m a
  {-# MINIMAL (>>=) #-}
    -- Defined in ‘GHC.Base’
instance Monad (Either e) -- Defined in ‘Data.Either’
instance Monad [] -- Defined in ‘GHC.Base’
instance Monad Maybe -- Defined in ‘GHC.Base’
instance Monad IO -- Defined in ‘GHC.Base’
instance Monad ((->) r) -- Defined in ‘GHC.Base’
instance Monoid a => Monad ((,) a) -- Defined in ‘GHC.Base’

モナドの機能は(>>=) :: m a -> (a -> m b) -> m bからも確認できます。 試しに実行してみます。

ghci>Just 3 >>= \x -> Just (x+1)
Just 4

return :: a -> m aについてはpureと同じようで引数の値をアプリカティブ値に包んで返します。 MaybeはMonad型クラスに属しているので引数として渡すことができます。ここでは普通の値aをとって文脈付きの値を返す\x -> Just (x+1)に対して文脈付きの値Just 3を渡しています。 関数の結果としてNothingを返すこともできます。

ghci>Just 1 >>= \x -> if x > 2 then Just (x+1) else Nothing
Nothing

次にサンプルで確認してみたいと思います。まずモナドを使わない例として以下を定義する。

type Birds = Int
type Pole = (Birds, Birds)

landLeftA :: Birds -> Pole -> Pole
landLeftA n (left, right) = (left + n, right)

landRightA :: Birds -> Pole -> Pole
landRightA n (left, right) = (left, right + n)
x -: f = f x

ここではBirdsPoleの2つの型シノニムを使います。landLeftA, landRightAでPoleの左右にとまっているBirdの数を管理します。x -: f = f xは関数と引数の順番を逆にする働きをします。 これより(0, 0) -: landLeftA 1 -: landRightA 1 -: landLeftA 2landLeftA 2 (landRightA 1 (landLeftA 1 (0, 0)))は同じ動きをします。

このlandLeftの機能を拡張しPoleの左右のBirdの数の差が4以上の場合は結果をNothingにするという場合はモナドを使えば簡単に修正できます。landLeftA, landRightAの戻り値が値を持つかNothingの状態を持つのでMaybeを返すようにします。

landLeft :: Birds -> Pole -> Maybe Pole
landLeft n (left, right)
    | abs ((left + n) - right) < 4 = Just (left + n, right)
    | otherwise                    = Nothing

landRight :: Birds -> Pole -> Maybe Pole
landRight n (left, right)
    | abs (left - (right + n)) < 4 = Just (left, right + n)
    | otherwise                    = Nothing

それから先ほどと同じように文脈付きの値に包まれたPoleに対してlandLeft, landRightを連続で読んでいき、途中で左右のBirdの数の差が4以上になったらNothingが返されるか確認します。以下を実行してみるとNothingが帰ってますが、return (0, 0) >>= landLeft 1 >>= landRight 4 >>= landLeft (-1)の時点で左右の差が4になってNothingを返すはずなので大丈夫そうです。
print $ return (0, 0) >>= landLeft 1 >>= landRight 4 >>= landLeft (-1) >>= landRight (-2) return (0, 0)としていますがreturnはpureと同じで値をとってファンクター値にしています。それから>>=を連続で使いファンクターの中の値に関数を適用しているのがわかるかと思います。

kaggleのTitanic問題をといてみる

kaggleでチュートリアルがわりに使われているTitanicの問題を解いてみて実際に行われている分析の流れを把握できるようにしたいと思います。 kaggleでは個人の解答が公開、議論されているので普段分析をしない人でも学習にはちょうど良さそうな気がします。

まずはデータの読み込み

import pandas as pd
from pandas import Series,DataFrame

# numpy, matplotlib, seaborn
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_style('whitegrid')
%matplotlib inline

# machine learning
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC, LinearSVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB

# get titanic & test csv files as a DataFrame
titanic_df = pd.read_csv("train.csv")
test_df    = pd.read_csv("test.csv")

# preview the data

それから読み込んだ情報を確認してみます。

titanic_df.head()

f:id:steavevaivai:20171123191113p:plain

titanic_df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 891 entries, 0 to 890
Data columns (total 12 columns):
PassengerId    891 non-null int64
Survived       891 non-null int64
Pclass         891 non-null int64
Name           891 non-null object
Sex            891 non-null object
Age            714 non-null float64
SibSp          891 non-null int64
Parch          891 non-null int64
Ticket         891 non-null object
Fare           891 non-null float64
Cabin          204 non-null object
Embarked       889 non-null object
dtypes: float64(2), int64(5), object(5)
memory usage: 83.6+ KB

Titanicの問題では学習データのcsvを読み込んで生き残った人(survived=1)を学習し、テストデータに対して分類を行うものとなっています。csvを読み込んだままの情報だと欠損があったり、不要な項目があったり、そのままでは分析に利用できない項目がありますので、実際に生存に影響のある項目だけが残すようにしていき機械学習して分類となります。

モデルの学習

とりあえず生存に明らかに必要のな誘うな項目は削除します。

titanic_df = titanic_df.drop(['PassengerId','Name','Ticket'], axis=1)
test_df    = test_df.drop(['Name','Ticket'], axis=1)

欠損が多い項目は削除しておきます。

titanic_df.drop("Cabin",axis=1,inplace=True)
test_df.drop("Cabin",axis=1,inplace=True)

Embarkedの項目を処理する

Embarkedの項目を分析で扱いようにします。まず各項目がどんな値を取っているのか確認してみます。

titanic_df[["Embarked", "Survived"]].groupby(['Embarked'],as_index=False).mean()

f:id:steavevaivai:20171123191043p:plain
C,Q,Sの3つの値取ることが確認できました。次に、titanic_df.info()実行時にEmbarkedは2項目欠損があったのでとりあえずSで埋めときます。

titanic_df["Embarked"] = titanic_df["Embarked"].fillna("S")

それからEmbarkedの項目が生存に影響しているかグラフ表示してみます。

sns.factorplot('Embarked','Survived', data=titanic_df,size=4,aspect=3)

f:id:steavevaivai:20171123191048p:plain
Sに比べてCとQの方が生存確率は高いかもしれないといった感じなのでしょうか。S,C,Qのぞれぞれの累計値、生存確率も出してみます。

fig, (axis1,axis2,axis3) = plt.subplots(1,3,figsize=(15,5))
sns.countplot(x='Embarked', data=titanic_df, ax=axis1)
sns.countplot(x='Survived', hue="Embarked", data=titanic_df, order=[1,0], ax=axis2)
embark_perc = titanic_df[["Embarked", "Survived"]].groupby(['Embarked'],as_index=False).mean()
sns.barplot(x='Embarked', y='Survived', data=embark_perc,order=['S','C','Q'],ax=axis3)

f:id:steavevaivai:20171123191038p:plain

C,Qかどうかを判断するための項目を分析用にデータフレームに追加します。

embark_dummies_titanic  = pd.get_dummies(titanic_df['Embarked'])
embark_dummies_titanic.drop(['S'], axis=1, inplace=True)

embark_dummies_test  = pd.get_dummies(test_df['Embarked'])
embark_dummies_test.drop(['S'], axis=1, inplace=True)

titanic_df = titanic_df.join(embark_dummies_titanic)
test_df    = test_df.join(embark_dummies_test)
titanic_df.drop(['Embarked'], axis=1,inplace=True)
test_df.drop(['Embarked'], axis=1,inplace=True)
titanic_df.head()

f:id:steavevaivai:20171123191108p:plain

Fare(運賃)の項目を処理する

欠損は中央値で埋めときます。

test_df["Fare"].fillna(test_df["Fare"].median(), inplace=True)

それからデータがFloat64になっていたのでintに変換しておきます。でグラフ表示します。

titanic_df['Fare'] = titanic_df['Fare'].astype(int)
test_df['Fare']    = test_df['Fare'].astype(int)
titanic_df['Fare'].plot(kind='hist', figsize=(15,3),bins=100, xlim=(0,50))

f:id:steavevaivai:20171123191059p:plain
生存者の運賃の平均と中央値を確認してみます。

fare_not_survived = titanic_df["Fare"][titanic_df["Survived"] == 0]
fare_survived     = titanic_df["Fare"][titanic_df["Survived"] == 1]

avgerage_fare = DataFrame([fare_not_survived.mean(), fare_survived.mean()])
std_fare      = DataFrame([fare_not_survived.std(), fare_survived.std()])
avgerage_fare.index.names = std_fare.index.names = ["Survived"]
avgerage_fare.plot(yerr=std_fare,kind='bar',legend=False)
std_fare.plot(yerr=std_fare,kind='bar',legend=False)

f:id:steavevaivai:20171123191104p:plain 運賃も生存に影響があったということでデータフレームに残しておきます。

年齢、性別を処理する

まずは年齢のデータを確認する
欠損値はSurvive毎で平均 ± 標準偏差の範囲の乱数を設定します。

average_age_titanic   = titanic_df["Age"].mean()
std_age_titanic       = titanic_df["Age"].std()
count_nan_age_titanic = titanic_df["Age"].isnull().sum()

average_age_test   = test_df["Age"].mean()
std_age_test       = test_df["Age"].std()
count_nan_age_test = test_df["Age"].isnull().sum()

rand_1 = np.random.randint(average_age_titanic - std_age_titanic, average_age_titanic + std_age_titanic, size = count_nan_age_titanic)
rand_2 = np.random.randint(average_age_test - std_age_test, average_age_test + std_age_test, size = count_nan_age_test)
titanic_df["Age"][np.isnan(titanic_df["Age"])] = rand_1
test_df["Age"][np.isnan(test_df["Age"])] = rand_2

Ageがfloat型だったのでint型に変換します。

titanic_df['Age'] = titanic_df['Age'].astype(int)
test_df['Age']    = test_df['Age'].astype(int)

それからグラフ表示します。

fig, (axis1,axis2) = plt.subplots(1,2,figsize=(15,4))
axis1.set_title('Original Age values - Titanic')
axis2.set_title('New Age values - Titanic')
titanic_df['Age'].dropna().astype(int).hist(bins=70, ax=axis1)
titanic_df['Age'].hist(bins=70, ax=axis2)

f:id:steavevaivai:20171125051734p:plain
年齢別の生存率を表示します。

facet = sns.FacetGrid(titanic_df, hue="Survived",aspect=4)
facet.map(sns.kdeplot,'Age',shade= True)
facet.set(xlim=(0, titanic_df['Age'].max()))
facet.add_legend()

fig, axis1 = plt.subplots(1,1,figsize=(18,4))
average_age = titanic_df[["Age", "Survived"]].groupby(['Age'],as_index=False).mean()
sns.barplot(x='Age', y='Survived', data=average_age)

f:id:steavevaivai:20171125051831p:plain
子供の方が生存率が高いことがわかります。

次に性別も合わせて子供か、成人男性か、成人女性かで分けてSurviveに相関関係があるかみてみます。

def get_person(passenger):
    age,sex = passenger
    return 'child' if age < 16 else sex
titanic_df['Person'] = titanic_df[['Age','Sex']].apply(get_person,axis=1)
test_df['Person']    = test_df[['Age','Sex']].apply(get_person,axis=1)
person_dummies_titanic  = pd.get_dummies(titanic_df['Person'])
person_dummies_titanic.columns = ['Child','Female','Male']
person_dummies_test  = pd.get_dummies(test_df['Person'])
person_dummies_test.columns = ['Child','Female','Male']

グラフで表示すると子供と女性が助かっていることがわかるのでデータフレームに残します。

fig, (axis1,axis2) = plt.subplots(1,2,figsize=(10,5))
sns.countplot(x='Person', data=titanic_df, ax=axis1)
person_perc = titanic_df[["Person", "Survived"]].groupby(['Person'],as_index=False).mean()
sns.barplot(x='Person', y='Survived', data=person_perc, ax=axis2, order=['male','female','child'])

f:id:steavevaivai:20171123201332p:plain

person_dummies_titanic.drop(['Male'], axis=1, inplace=True)
person_dummies_test.drop(['Male'], axis=1, inplace=True)
titanic_df = titanic_df.join(person_dummies_titanic)
test_df    = test_df.join(person_dummies_test)
titanic_df.drop(['Sex'],axis=1,inplace=True)
test_df.drop(['Sex'],axis=1,inplace=True)
titanic_df.drop(['Person'],axis=1,inplace=True)
test_df.drop(['Person'],axis=1,inplace=True)           

SibSp,Parchを処理する

SibSp, Parchは同伴者を表しているようで以下のように家族ずれかどうかを判断するように変換します。

titanic_df['Family'] =  titanic_df["Parch"] + titanic_df["SibSp"]
titanic_df['Family'].loc[titanic_df['Family'] > 0] = 1
titanic_df['Family'].loc[titanic_df['Family'] == 0] = 0

test_df['Family'] =  test_df["Parch"] + test_df["SibSp"]
test_df['Family'].loc[test_df['Family'] > 0] = 1
test_df['Family'].loc[test_df['Family'] == 0] = 0

Familyの項目に変換したので使わなくなったSibSp, Parchは削除します。

titanic_df = titanic_df.drop(['SibSp','Parch'], axis=1)
test_df    = test_df.drop(['SibSp','Parch'], axis=1)

それからグラフ表示したら家族ずれの方が生存確率がたかそうなのがわかるので残しておきます。

fig, (axis1,axis2) = plt.subplots(1,2,sharex=True,figsize=(10,5))

# sns.factorplot('Family',data=titanic_df,kind='count',ax=axis1)
sns.countplot(x='Family', data=titanic_df, order=[1,0], ax=axis1)

# average of survived for those who had/didn't have any family member
family_perc = titanic_df[["Family", "Survived"]].groupby(['Family'],as_index=False).mean()
sns.barplot(x='Family', y='Survived', data=family_perc, order=[1,0], ax=axis2)

axis1.set_xticklabels(["With Family","Alone"], rotation=0)

f:id:steavevaivai:20171123201307p:plain

Pclassを処理する

表示してみる

sns.factorplot('Pclass','Survived',order=[1,2,3], data=titanic_df,size=5)
pclass_dummies_titanic  = pd.get_dummies(titanic_df['Pclass'])
pclass_dummies_titanic.columns = ['Class_1','Class_2','Class_3']

pclass_dummies_test  = pd.get_dummies(test_df['Pclass'])
pclass_dummies_test.columns = ['Class_1','Class_2','Class_3']

titanic_df.drop(['Pclass'],axis=1,inplace=True)
test_df.drop(['Pclass'],axis=1,inplace=True)

f:id:steavevaivai:20171125052137p:plain
pclassは1,2の場合に生存率高いことがわかったのでpclass1,2かどうか判定した結果をデータフレームに付与します。

pclass_dummies_test.drop(['Class_3'], axis=1, inplace=True)
pclass_dummies_titanic.drop(['Class_3'], axis=1, inplace=True)

titanic_df = titanic_df.join(pclass_dummies_titanic)
test_df    = test_df.join(pclass_dummies_test)

最終的にモデルはこのようになりました。

titanic_df.head()

f:id:steavevaivai:20171125055040p:plain

機械学習

このように分析に影響のある項目だけが残るようにデータフレームを操作するのですが、最終的には機械学習で分類を行います。scikit-learnなら簡単に使うことができるので良いと思います。ロジスティック回帰、ランダムフォレストであれば以下のようになります。

# ロジスティック回帰
X_train = titanic_df.drop("Survived",axis=1)
Y_train = titanic_df["Survived"]

logreg = LogisticRegression()
logreg.fit(X_train, Y_train)
Y_pred = logreg.predict(X_test)
logreg.score(X_train, Y_train)
0.8058361391694725

# ランダムフォレスト
random_forest = RandomForestClassifier(n_estimators=100)
random_forest.fit(X_train, Y_train)
Y_pred = random_forest.predict(X_test)
random_forest.score(X_train, Y_train)
0.9640852974186308

今回はランダムフォレストの方がうまく分類できているようです。事前に意味のある情報に分類をしていたのでその場合はランダムフォレストの精度がよくなるのでしょうか。 kaggleのkernelをみたら実際の分析手順を追えるけど欠損の扱いや目的変数に対して影響があるのかを判断して説明できるようになるにはちゃんと勉強した方がよさそうな気がします。

sbt assemlby実行時にリソースフォルダを変更できるようにしたい

まずデフォルトのリソースフォルダを変更してみる

sbt assemblyで実行可能jarを作成する際通常は"src/main/resources"がリソースフォルダとして使われますが、build.sbtにresourceDirectory in Compileの設定をすることによりデフォルトのリソースフォルダを変更することができます。以下では標準のリソースフォルダがsrc/main/resources/developmentになるように変更しています。

lazy val app = (project in file(".")).
  settings(assemblySettings: _*).
  settings(resourceDirectory in Compile := baseDirectory.value / "src" / "main" / "resources" / "development")

sbt assembly実行時に動的にリソースフォルダを変更してみる

例えば以下のようなフォルダの構成でリソースフォルダをビルド時に変更したい場合

src/main/resources
            ├── development: 開発時に使用するリソース
            ├── production: 本番環境リリース時のリソース
            └── staging: ステージ環境リリース時のリソース

この場合、最初はresourceDirectoryを変更するタスクを作ってassemblyと連続して呼び出せば大丈夫なのかと思ったのですがタスク間ではresourceDirectoryの変更などの副作用が起きないような動きをしていて困ったのですが、sbtにはaddCommandAliasというものがあり事前にset resourceDirectory in Compileすれば大丈夫そうでした。結果ですが以下のbuild.sbtの設定ではsbt assemblyではデフォルトのリソースフォルダをsbt prodEnvAssemblyではproduction環境のリソースフォルダをsbt stgEnvAssemlbyではstaging環境のリソースフォルダを使うようになります。

lazy val app = (project in file(".")).
  settings(resourceDirectory in Compile := baseDirectory.value / "src" / "main" / "resources" / "development")

addCommandAlias("prodEnvAssembly", ";set resourceDirectory in Compile := baseDirectory.value / \"src\" / \"main\" / \"resources\" / \"production\"; assembly")
addCommandAlias("stgEnvAssembly", ";set resourceDirectory in Compile := baseDirectory.value / \"src\" / \"main\" / \"resources\" / \"staging\"; assembly")

Sparkで状態を持つobjectを使い回す方法について調べてみた

Sparkでの開発時に既存のjava資源を使用するのはよくあると思うけど、objectが状態を持っていて使い回す必要がある場合も考えられるのでその場合どうすれば良いのか調べてみました。

まず以下のように引数で与えた値だけ内部のカウンターを増やすクラスがあったとする。

class MyCounter extends java.io.Serializable{    
  private var count = 0    
  def countUp(up: Int): Unit ={    
    this.count += up    
  }    
  def getCount = this.count    
}    

このクラスに対して1~10の設定でそれぞれの設定によりカウントアップする数値が決まる使い方を想定する。このクラスに対してログを読み込むたびにカウントアップのメソッドを呼び出したいけど、設定を引数として渡すものになっているので1の設定なら次カウントアップする際も1の設定を維持するといった必要があります。

まず設定とカウンターobjectをRDDにしてmap処理にてカウントアップ後自分自身を返すようにしてみます。この場合は想定通り、map後にカウントアップ後の結果が入っていることが確認できます。

val configRdd = sc.makeRDD(1 to 10)    
val counterRdd = configRdd.map((new MyCounter, _))    

val afterCountup = counterRdd.map{ exeConfig =>    
  exeConfig._1.countUp(exeConfig._2)    
  exeConfig    
}    
afterCountup.foreach{ exeConfig =>    
  println(exeConfig._1.getCount)    
}    

次に一つのcounterオブジェクトをログのRDDのmap処理内でカウントアップしてみる。
これはtask内でobjectが共有されるためcount=1の結果が100個にならない
自分の環境だとmap後の100個のカウンターが0~50の値を取っていた。map処理内でtaskが2個に別れたのか。

val myc = new MyCounter    
val logRdd = sc.makeRDD(1 to 100)    
val newc = logRdd.map { log =>    
  myc.countUp(1)    
  myc    
}    
newc.foreach{row => println(row.getCount) }    

foreach内でのカウントアップだが、これだとforeachの中と外では別のobjectのためカウントアップの結果はforeachの外には反映されていない。

val myc = new MyCounter    
val logRdd = sc.makeRDD(1 to 100)    
logRdd.foreach { log =>    
  myc.countUp(1)    
}    
println(myc.getCount)    

並列処理とは関係なくなるけど配列のforeach処理内でカウンターobjectをカウントアップするのであれば想定通りカウントアップする。

val myc = new MyCounter    
val logRdd = sc.makeRDD(1 to 100)    
logRdd.collect.foreach { log =>    
  myc.countUp(1)    
}    
println(myc.getCount)    

1 ~ 10の設定のobjectに対してlogを読み込んで状態を持つobjectでカウントアップする場合は、カウンターのobjectをRDDにしてそれからmapで配列に変換したログのforeachでカウントアップするようにしたら良さそう。この場合はRDDとして使用するカウンターobjectの数だけ並列に処理することになるのか。

val configRdd = sc.makeRDD(1 to 10)    
val counterRdd = configRdd.map((new MyCounter, _))    
val logRdd = sc.makeRDD(1 to 100)    

val logRddArray = logRdd.collect    
val logRddArrayBroadcast = sc.broadcast(logRddArray)    


val countUp = counterRdd.map{ exeConfig =>    
  logRddArrayBroadcast.value.foreach{log=>    
    exeConfig._1.countUp(exeConfig._2)    
  }    
  exeConfig    
}    
countUp.foreach { exeConfig =>    
  println(exeConfig._1.getCount)    
}    

ApacheSparkで扱うobjectのSerializableの必要性について

ApacheSparkで扱うobjectのSerializableの必要性について

hiveやファイルからデータを読み込んだ直後値はRDD, Dataset, DataFrameになっていて、少ないデータに対して何回もfilter処理を行う必要がある場合に一旦collectして配列に変換しdriver内で処理したい場合もあると思うけど、データを読み込んだ際にnon-serializableなクラスに値をセットしていたらcollectで配列への変換時にエラーが発生したのでその際のメモ

例えば以下のようなjavaのクラスがあったとして、これをsparkで利用するとする

public class IdsBean {
  private int id;
  public IdsBean(int id) { /* compiled code */ }
  public int getId() { /* compiled code */ }
  public void setId(int id) { /* compiled code */ }
}
scala> val rdd = sc.makeRDD(1 to 10)
scala> val rddBean = rdd.map(new IdsBean(_))

foreachやmapの処理は行える

scala> rddBean.foreach{ row => println(row.getId) }
6
7
8
9
10
1
2
3
4
5
scala> rddBean.map(row=>row).foreach(row=>println(row.getId))
1
2
3
4
5
6
7
8
9
10

ただcollectの処理はエラーが発生する

scala> rddBean.collect
[Stage 10:>                                                         (0 + 0) / 2]17/10/07 13:43:41 ERROR executor.Executor: Exception in task 0.0 in stage 10.0 (TID 20)
java.io.NotSerializableException: dto.IdsBean
Serialization stack:
    - object not serializable (class: dto.IdsBean, value: dto.IdsBean@2872403e)
    - element of array (index: 0)
    ...

collectの処理ではRDDは各executorに分散されていてそれをdriverに集めるのだが、その際にserialize → deserializeの処理が実行されるのだが、対象のクラスがnon-serializableの場合は シリアライズできないのでcollectの処理を実行する時点でエラーが発生するようだ。

対応策として以下のように対象のクラスを継承してserializableにすることを考えたがそれではダメだった。

scala> class IdsBeanS(id: Int) extends dto.IdsBean(id) with java.io.Serializable

これについてはjava.io.Serializableのjavadocでコメントがあり、non-serializableなクラスのsubtypeをSerilizableにする場合はスーパークラスのpublic, protectedなメンバ変数がシリアライズの対象となるようだ。

To allow subtypes of non-serializable classes to be serialized, the
subtype may assume responsibility for saving and restoring the
state of the supertype's public, protected, and (if accessible)
package fields.  The subtype may assume this responsibility only if
the class it extends has an accessible no-arg constructor to
initialize the class's state.  It is an error to declare a class
Serializable if this is not the case.  The error will be detected at
runtime.

また、executor-driver間でobjectを共有する場合はSerializableでになっている必要がわかった。dirver-executor間で値を共有するタイミングだがmapやforeach、collectなどのapiを実行時のようで、シリアライズ、でシリアライズの対象になっているobjectはapiによっても違っているようだ。 例えば先ほどのnon-serializableなRDDについて以下の処理であれば問題ない。

scala> rddBean.foreach{ row => println(row.getId) }

だが、mapやforeachの処理内でnon-sirializableなクラスを使用しようとしたらエラーが発生する。mapやforeachの内部で使うobjectは各executorに送られるのでserializableである必要があるようだ。

scala> val rdd = sc.makeRDD(1 to 10)
scala> rdd.foreach{rddRow =>
     |   rddBean.foreach(beanRow => println(beanRow.getId))
     | }
17/10/07 14:09:03 ERROR executor.Executor: Exception in task 0.0 in stage 13.0 (TID 26)
org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

いつexecutor間にobjectが送られるのかは知っていた方が良いけど、基本的にはSerializableなobjectを使うのが安全そう。 あと同一のexecutorにobjectが送られたとしてもtask内でシリアライズ、デシリアライズして使用するからtask間で同じobjectを参照するということはないらしい。 (基本的なことだけど、object自体が状態を持っていてスレッドセーフでなければ問題が発生するかもしれない) https://twitter.com/maropu/status/889747740858568704

Scalaでseqを操作してみる

scalaでSeqを操作してみる

まず以下のcase classがあったとし、

case class Element(id: Int, time: java.sql.Timestamp)

初期のデータとして以下を保持する

val elementSeq = Array(
  Element(1, new java.sql.Timestamp(new DateTime(2017, 8, 10, 16, 13).getMillis))
  , Element(2, new java.sql.Timestamp(new DateTime(2017, 8, 9, 11, 5).getMillis))
  , Element(3, new java.sql.Timestamp(new DateTime(2017, 5, 22, 9, 13).getMillis))
  , Element(4, new java.sql.Timestamp(new DateTime(2017, 9, 1, 22, 13).getMillis))
  , Element(5, new java.sql.Timestamp(new DateTime(2017, 7, 31, 23, 13).getMillis))
  , Element(6, new java.sql.Timestamp(new DateTime(2017, 8, 15, 12, 7).getMillis))
).toSeq

ソート

Timestampの降順でソートするのは以下のようになる。TimestampはComparableをimplementsしていないので、ソートする際はgetTimeでミリ秒に変換などする

val sortedArray = elementSeq.sortBy(-_.time.getTime)
sortedArray.foreach(println)

unionでseqを結合する。

val elementSeq2 = Array(
  Element(1, new java.sql.Timestamp(new DateTime(2017, 8, 12, 15, 11).getMillis))
  , Element(2, new java.sql.Timestamp(new DateTime(2017, 8, 7, 6, 5).getMillis))
  , Element(3, new java.sql.Timestamp(new DateTime(2017, 5, 9, 16, 13).getMillis))
  , Element(4, new java.sql.Timestamp(new DateTime(2017, 9, 1, 23, 59).getMillis))
  , Element(5, new java.sql.Timestamp(new DateTime(2017, 7, 31, 23, 12).getMillis))
  , Element(6, new java.sql.Timestamp(new DateTime(2017, 8, 14, 23, 4).getMillis))
).toSeq

val unionTimeSeq = elementSeq.map(_.time)
  .union(elementSeq2.map(_.time))
  .sortBy(_.getTime)
unionTimeSeq.foreach(println)

要素の追加とフィルター

次にstartDateで指定した日付以降でフィルターして昇順でソートするのは以下のようになる。

val startDate = new java.sql.Timestamp(new DateTime(2017, 6, 1, 0, 0).getMillis)
val timeArray = (startDate +: unionTimeSeq)
  .filter(_.getTime >= startDate.getTime)
  .sortBy(_.getTime)
timeArray.foreach(println)

指定日の直前のレコードの取り出し

日付のSeqでmapし、事前に日付の降順でソートされたSeqから指定日以前の一番新しいレコードを取り出す処理は以下のようになる。

val validRecordSeq = timeArray.map{time=>
    val record = sortedArray
      .filter(_.time.getTime <= time.getTime)
      .headOption.getOrElse(Element(0, new java.sql.Timestamp(new DateTime(2017, 6, 1, 0, 0).getMillis)))
    (time, record)
}
validRecordSeq.foreach(println)

sparkからhiveを利用してみる

spark-shellにてクラスパスを指定する

spark-shell --driver-class-path 対象クラスパス

開発時にちょっと修正後にいちいちビルドしてデプロイして実行するのが面倒なので、インタラクティブシェルにて動作を確認後、ソースに反映の流れにしたい

hive

SQLを実行してみる

パッケージのインポートからselect文実行まで 以下のテーブルを使用するものとする

show create table sample;
+-----------------------------------------------------------------+
                        createtab_stmt                          |
+-----------------------------------------------------------------+
CREATE  TABLE sample(                                           |
  id int)                                                       |
ROW FORMAT SERDE                                                |
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'          |
STORED AS INPUTFORMAT                                           |
  'org.apache.hadoop.mapred.TextInputFormat'                    |
OUTPUTFORMAT                                                    |
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  |
  ...                                                           |
+-----------------------------------------------------------------+
import org.apache.spark._
import org.apache.spark.sql.hive._
val hc = new HiveContext(sc)

val select="select * from sample"
val sqlResult = hc.sql(select)
sqlResult.foreach(row=>println(row))

取得対象の絡むと型を指定する

sqlResult.foreach(row=>println(row.getAs[Int]("id")))

sqlの実行結果をmapで型にセットする

sqlResult.map(row => new IdsBean(row.getAs[Int]("id")))

この時変換先の型がserizableでないとエラーになるので、既存のjava資源でserizableをimplementしていない型にセットする場合は、 scalaの方で利用できるように拡張する必要がある

case class IdsSBean(id: Int) extends dto.IdsBean(id) with java.io.Serializable
val idsRDD = sqlResult.map(row => new IdsSBean(row.getAs[Int]("id")))

RDDから配列に変換する

idsRDD.collect

RDDからSeqに変換する

idsRDD.collect.toSeq

summarizationsパターンを試してみる

簡単な数値の集計を行ってみたいと思います。

まず動作確認に使うデータを登録します。 テーブル作成

create table numerical_input(
  user_id int
  , input int
);

動作確認に使うファイルをcsvファイルに保存してhdfsにアップロード ^Aは制御文字になっておりvimであればCtrl +V Ctrl + Aでファイルに入力できる

# vim numerical_input.txt

12345^A10
12345^A8
12345^A21
54321^A1
54321^A47
54321^A8
88888^A7
88888^A12

# hdfs dfs -put numerical_input.txt /input/

それからテーブルにデータを取り込む

load data inpath '/input/numerical_input.txt' into table numerical_input;
select * from numerical_input;

次にscalaでデータをRDDとして読み込んでみる

scala> hc.sql("select * from numerical_input")
res24: org.apache.spark.sql.DataFrame = [user_id: int, input: int]

scala> val numericalRDD = hc.sql("select * from numerical_input") map { row =>
     | (row.getAs[Int]("user_id"), row.getAs[Int]("input"), 1)
     | }

scala> numericalRDD.show
+-----+---+---+
|   _1| _2| _3|
+-----+---+---+
|12345| 10|  1|
|12345|  8|  1|
|12345| 21|  1|
|54321|  1|  1|
|54321| 47|  1|
|54321|  8|  1|
|88888|  7|  1|
|88888| 12|  1|
+-----+---+---+

Datasetのapiを実行してみる

where

scala> numericalRDD.where($"_1" > 60000).show
+-----+---+---+
|   _1| _2| _3|
+-----+---+---+
|88888|  7|  1|
|88888| 12|  1|
+-----+---+---+

sort

scala> numericalRDD.sort($"_2").show
+-----+---+---+
|   _1| _2| _3|
+-----+---+---+
|54321|  1|  1|
|88888|  7|  1|
|12345|  8|  1|
|54321|  8|  1|
|12345| 10|  1|
|88888| 12|  1|
|12345| 21|  1|
|54321| 47|  1|
+-----+---+---+

scala側でデータが読み込めるようになったのでタプルの一番目にuser_idでグルーピングを行い、タプルに2番目の要素に最大値、3番目に最小値、4番目にカウント結果が入るようにしてみる。

scala> numericalRDD.groupBy($"_1" as "user_group").agg(max($"_2"), min($"_2"), count($"_3")).show
+----------+-------+-------+---------+
|user_group|max(_2)|min(_2)|count(_3)|
+----------+-------+-------+---------+
|     54321|     47|      1|        3|
|     88888|     12|      7|        2|
|     12345|     21|      8|        3|
+----------+-------+-------+---------+