Chainerによるロジスティック回帰

PythonディープラーニングフレームワークであるChainerを使って簡単なセンチメント分析を行ってみたいと思います。

必要なモジュールのインポート

# import chainer module
import numpy as np
import chainer
from chainer import cuda, Function, gradient_check, Variable
from chainer import optimizers, serializers, utils
from chainer import Link, Chain, ChainList
import chainer.functions as F
import chainer.links as L

# import mecab module
import re
import MeCab
import json

それからロジスティック回帰用のモデルを定義します。レイヤーは一つで入力が単語の種類数で、出力がクラスの数で今回はポジティブ、ネガティブの2つになるように引数で渡します。単語の種類数は学習データを読み無彩にカウントします。

# ロジスティック回帰を行います
class MyRogistic(Chain):
    # パラメータ数に語彙の数, 分類結果数を受け取ります。
    def __init__(self, vocab_count, class_count):
        # モデルを定義しています。中間層もない1層ネットワークです。
        super(MyRogistic, self).__init__(
            l1=L.Linear(vocab_count,class_count),
        )

    def __call__(self,x,y):
        return F.mean_squared_error(self.fwd(x), y)

    # 誤差関数はソフトマックスです
    def fwd(self,x):
        return F.softmax(self.l1(x))

次に入力データから単語の頻出数をカウントできるようにするためのメソッドを準備しておきます。判定に関係なさそうな単語は除外するようにします。

# 形態素解析を行うための関数を定義しておきます
def _mecab_parse_feat(feat):
    return dict(zip(_mecab_feat_labels, feat.split(',')))


def _mecab_node2seq(node, decode_surface=True, feat_dict=True,
                    mecab_encoding='utf-8'):
    # MeCab.Nodeはattributeを変更できない。
    while node:
        error_count = 0
        try:
            if decode_surface:
                node._surface = node.surface
            if feat_dict:  # 品詞の情報をdictで保存
                node.feat_dict = _mecab_parse_feat(
                node.feature
                )
            yield node
        except:
            error_count += 1
        node = node.next

#回帰の邪魔になるストップワードは除外するようにします。
def is_stopword(n):  # <- mecab node
    if len(n._surface) == 0:
        return True
    elif re.search(u'^[\s!-@\[-`\{-~ 、-〜!-@[-`]+$', n._surface):
        return True
    elif re.search(u'^(接尾|非自立)', n.feat_dict['cat1']):
        return True
    elif u'サ変・スル' == n.feat_dict['conj'] or u'ある' == n.feat_dict['orig']:
        return True
    elif re.search(u'^(名詞|動詞|形容詞)', n.feat_dict['pos']):
        return False
    else:
        return True


def not_stopword(n):  # <- mecab node
    return not is_stopword(n)


def node2word(n):  # <- mecab node
    return n._surface


def node2norm_word(n):  # mecab node
    if n.feat_dict['orig'] != '*':
        return n.feat_dict['orig']
    else:
        return n._surface


def word_segmenter_ja(sent, node_filter=not_stopword,
                      node2word=node2norm_word, mecab_encoding='utf-8'):
    if type(sent) == "unicode":
        sent = sent.encode(mecab_encoding)

    nodes = list(
        _mecab_node2seq(_mecab.parseToNode(sent), mecab_encoding=mecab_encoding)
    )
    if node_filter:
        nodes = [n for n in nodes if node_filter(n)]
    words = [node2word(n) for n in nodes]

    return words

日本語の構文解析に使うmecabを初期化します。辞書にはneologdを使用しています。

# 形態素解析の辞書にneologdを使用します
tagger =  MeCab.Tagger(' -Ochasen -d /usr/local/lib/mecab/dic/mecab-ipadic-neologd')
_mecab = MeCab.Tagger()
# 品詞,品詞細分類1,品詞細分類2,品詞細分類3,活用形,活用型,原形,読み,発音
_mecab_feat_labels = 'pos cat1 cat2 cat3 conj conj_t orig read pron'.split(' ')

学習データを初期化します。本番ではDBに保存されているデータをここで読み込むようにします。

# 学習用データ準備
text1 = """今までにない臨場感ですごい!効果ですけど値段に見合うだけの価値があります。"""

text2 = """ 昔見た近未来に一つ近づいたと思います。"""

text3 = """ 評価は高いようですが簡単に手を出せる価格ではありません。これを買うくらいなら別のVR製品を買ってもおまけが来ます。
正直信者が騒いでいるだけに思います。
"""

text4 = """値段は下がるはずなので今は待ちだと思います。遊ぶにもソフトが少ないですし。
"""

learn_docs = [
    word_segmenter_ja(text1),
    word_segmenter_ja(text2),
    word_segmenter_ja(text3),
    word_segmenter_ja(text4)
]

# 学習データをポジティブ0, ネガティブ1の何方かに分けるようにします
ans_index = [0,0,1,1]

# 語彙の数を求めるのに使う
def word_vocabularies(data):
    vocabularies = {}
    word_id = 0
    for doc in data:
        for word in doc:
            if word not in vocabularies:
                vocabularies[word] = word_id
                word_id += 1
    return vocabularies

# データ数
data_num = len(learn_docs)
# 分類するクラス数 今回はポジティブ、ネガティブの2値分類です
class_num = 2
word_vocab = word_vocabularies(learn_docs)
vocab_count = len(word_vocab)
learn_data = np.zeros(vocab_count * data_num).reshape(data_num, vocab_count).astype(np.float32)

# 学習時に使う入力データを初期化します
for i in range(len(learn_docs)):
    for word in learn_docs[i]:
        learn_data[i, word_vocab[word]] += 1.0

# 学習時に使う答えのデータを初期化します
learn_ans = np.zeros(class_num * data_num).reshape(data_num,class_num).astype(np.float32)
for i in range(data_num):
    learn_ans[i,np.int(ans_index[i])] = 1.0

ロジスティック回帰のモデルを初期化し、学習を行います。

# モデルを初期化します。
model = MyRogistic(vocab_count, class_num)
# パラメータの最適化アルゴリズムにAdamを使います
optimizer = optimizers.Adam()
optimizer.setup(model)

#学習します
# 学習データをランダムにサンプリングするために使います。
n = 4
# バッチサイズです。今回は学習データが4つしかないのでほとんど意味ないです
bs = 25
for j in range(5000):
    # インデックスをランダムにします
    sffindx = np.random.permutation(n)
    accum_loss = None
    for i in range(0, n, bs):
        # ミニバッチを取得
        x = Variable(learn_data[sffindx[i:(i+bs) if (i+bs) < n else n]])
        y = Variable(learn_ans[sffindx[i:(i+bs) if (i+bs) < n else n]])
        # 勾配を初期化
        model.zerograds()
        # 順方向に計算して誤差を取得
        loss = model(x,y)
        # 逆伝搬を行います
        loss.backward()
        # パラメータを更新します。今回はAdamを使っています。
        optimizer.update()
        # accum_loss = loss if accum_loss is None else accum_loss + loss バッチサイズで誤差を累計
    # accum_loss.backward() 逆伝搬
    # optimizer.update() パラメータ更新

学習によりモデルが最適化されたので、それを使って計算を行い正しい結果が得られるか確認します。(今回は学習データが4つなのであんまり意味ないと思いますが)

# パラメータ更新後に回帰を行ってみる
xt = Variable(learn_data, volatile='on')
# 準伝搬での計算結果を取得
yy = model.fwd(xt)

# 答え合わせ用のデータを初期化
ans = yy.data
nrow, ncol = ans.shape
ok = 0
ans_word=['posi', 'nega']
for i in range(nrow):
    # 分類結果の取得
    cls = np.argmax(ans[i,:])
    print(ans_word[cls])
    if learn_ans[i][cls] == 1.0:
        ok += 1
#正解数の表示
print (ok, "/", nrow, " = ", (ok * 1.0)/nrow)

学習をしたあとはモデル、辞書を保存します。

# 学習データの保存

# モデルの保存
# Save the model and the optimizer
print('save the model')
serializers.save_npz('sentiment.model', model)
print('save the optimizer')
serializers.save_npz('sentiment.state', optimizer)

# 辞書の保存
with open(  'vocab.dump', "w") as vocab_f:
    json.dump(str(word_vocab), vocab_f)

保存した辞書を読み込んでみます。

# 学習データの読み込み
load_vocab = {}
with open(  'vocab.dump', "r") as vocab_f:
    for data in json.load(vocab_f)[1:-1].split(","):
        pare_data = data.replace("'", "").split(":")
        load_vocab[pare_data[0].strip()] = pare_data[1].strip()

vocab_count = len(load_vocab)
class_count = 2
# load the model and the optimizer
load_model = MyRogistic(vocab_count, class_count)
load_optimizer = optimizers.Adam()
load_optimizer.setup(load_model)

print('load the model')
serializers.load_npz('sentiment.model',  load_model)
print('load the optimizer')
serializers.load_npz('sentiment.state', load_optimizer)

きちんと読み込まれているか試しに判定してみます。

# テストデータ
test_text1 = """今までにない臨場感ですごい!効果ですけど値段に見合うだけの価値があります。"""

test_text2=""" 評価は高いようですが簡単に手を出せる価格ではありません。これを買うくらいなら別のVR製品を買ってもおまけが来ます。
正直信者が騒いでいるだけに思います。
"""

test_doc = [
    word_segmenter_ja(test_text1),
    word_segmenter_ja(test_text2)
]

test_data = np.zeros(vocab_count *2 ).reshape(2, vocab_count).astype(np.float32)
for i in range(len(test_doc)):
    for word in test_doc[i]:
        if word in load_vocab:
            test_data[i,np.int(load_vocab[word])] += 1.0

xt = Variable(test_data, volatile='on')
yy = load_model.fwd(xt)

ans = yy.data
nrow, ncol = ans.shape
ans_word=['posi', 'nega']
for i in range(nrow):
    cls = np.argmax(ans[i,:])
    print(ans_word[cls])

chainerなら基本的なことは簡単に試せそうです。