読者です 読者をやめる 読者になる 読者になる

Chainerによるロジスティック回帰

Chainer python

PythonディープラーニングフレームワークであるChainerを使って簡単なセンチメント分析を行ってみたいと思います。

必要なモジュールのインポート

# import chainer module
import numpy as np
import chainer
from chainer import cuda, Function, gradient_check, Variable
from chainer import optimizers, serializers, utils
from chainer import Link, Chain, ChainList
import chainer.functions as F
import chainer.links as L

# import mecab module
import re
import MeCab
import json

それからロジスティック回帰用のモデルを定義します。レイヤーは一つで入力が単語の種類数で、出力がクラスの数で今回はポジティブ、ネガティブの2つになるように引数で渡します。単語の種類数は学習データを読み無彩にカウントします。

# ロジスティック回帰を行う
# Define model
class MyRogistic(Chain):
    def __init__(self, vocab_count, class_count):
        super(MyRogistic, self).__init__(
            l1=L.Linear(vocab_count,class_count),
        )

    def __call__(self,x,y):
        return F.mean_squared_error(self.fwd(x), y)

    def fwd(self,x):
        return F.softmax(self.l1(x))

次に入力データから単語の頻出数をカウントできるようにするためのメソッドを準備しておきます。判定に関係なさそうな単語は除外するようにします。

# util mecab method
def _mecab_parse_feat(feat):
    return dict(zip(_mecab_feat_labels, feat.split(',')))


def _mecab_node2seq(node, decode_surface=True, feat_dict=True,
                    mecab_encoding='utf-8'):
    # MeCab.Nodeはattributeを変更できない。
    while node:
        error_count = 0
        try:
            if decode_surface:
                node._surface = node.surface
            if feat_dict:  # 品詞の情報をdictで保存
                node.feat_dict = _mecab_parse_feat(
                node.feature
                )
            yield node
        except:
            error_count += 1
        node = node.next


def is_stopword(n):  # <- mecab node
    if len(n._surface) == 0:
        return True
    elif re.search(u'^[\s!-@\[-`\{-~ 、-〜!-@[-`]+$', n._surface):
        return True
    elif re.search(u'^(接尾|非自立)', n.feat_dict['cat1']):
        return True
    elif u'サ変・スル' == n.feat_dict['conj'] or u'ある' == n.feat_dict['orig']:
        return True
    elif re.search(u'^(名詞|動詞|形容詞)', n.feat_dict['pos']):
        return False
    else:
        return True


def not_stopword(n):  # <- mecab node
    return not is_stopword(n)


def node2word(n):  # <- mecab node
    return n._surface


def node2norm_word(n):  # mecab node
    if n.feat_dict['orig'] != '*':
        return n.feat_dict['orig']
    else:
        return n._surface


def word_segmenter_ja(sent, node_filter=not_stopword,
                      node2word=node2norm_word, mecab_encoding='utf-8'):
    if type(sent) == "unicode":
        sent = sent.encode(mecab_encoding)

    nodes = list(
        _mecab_node2seq(_mecab.parseToNode(sent), mecab_encoding=mecab_encoding)
    )
    if node_filter:
        nodes = [n for n in nodes if node_filter(n)]
    words = [node2word(n) for n in nodes]

    return words

日本語の構文解析に使うmecabを初期化します。辞書にはneologdを使用しています。

tagger =  MeCab.Tagger(' -Ochasen -d /usr/local/lib/mecab/dic/mecab-ipadic-neologd')
_mecab = MeCab.Tagger()
# 品詞,品詞細分類1,品詞細分類2,品詞細分類3,活用形,活用型,原形,読み,発音
_mecab_feat_labels = 'pos cat1 cat2 cat3 conj conj_t orig read pron'.split(' ')

学習データを初期化します。本番ではDBに保存されているデータをここで読み込むようにします。

# 学習用データ準備
text1 = """今までにない臨場感ですごい!効果ですけど値段に見合うだけの価値があります。"""

text2 = """ 昔見た近未来に一つ近づいたと思います。"""

text3 = """ 評価は高いようですが簡単に手を出せる価格ではありません。これを買うくらいなら別のVR製品を買ってもおまけが来ます。
正直信者が騒いでいるだけに思います。
"""

text4 = """値段は下がるはずなので今は待ちだと思います。遊ぶにもソフトが少ないですし。
"""

learn_docs = [
    word_segmenter_ja(text1),
    word_segmenter_ja(text2),
    word_segmenter_ja(text3),
    word_segmenter_ja(text4)
]

それから、学習用のデータから単語の種類数、学習データごとでそれぞれの単語の出現数、あと解答データを初期化しています。

def word_vocabularies(data):
    vocabularies = {}
    word_id = 0
    for doc in data:
        for word in doc:
            if word not in vocabularies:
                vocabularies[word] = word_id
                word_id += 1
    return vocabularies

# 学習データ作成完了
data_num = 4
class_num = 2
word_vocab = word_vocabularies(learn_docs)
vocab_count = len(word_vocab)
learn_data = np.zeros(vocab_count * data_num).reshape(data_num, vocab_count).astype(np.float32)
for i in range(len(learn_docs)):
    for word in learn_docs[i]:
        learn_data[i, word_vocab[word]] += 1.0
ans_index = [0,0,1,1]
learn_ans = np.zeros(class_num * data_num).reshape(data_num,class_num).astype(np.float32)
for i in range(data_num):
    learn_ans[i,np.int(ans_index[i])] = 1.0

次にモデルを初期化して、

# Initialize model
model = MyRogistic(vocab_count, class_num)
optimizer = optimizers.Adam()
optimizer.setup(model)

学習を行います。

# Learn
n = 4
bs = 25
for j in range(5000):
    sffindx = np.random.permutation(n)
    accum_loss = None
    for i in range(0, n, bs):
        x = Variable(learn_data[sffindx[i:(i+bs) if (i+bs) < n else n]])
        y = Variable(learn_ans[sffindx[i:(i+bs) if (i+bs) < n else n]])
        model.zerograds()
        loss = model(x,y)
        loss.backward()
        optimizer.update()
        # accum_loss = loss if accum_loss is None else accum_loss + loss バッチサイズで誤差を累計
    # accum_loss.backward() 逆伝搬
    # optimizer.update() パラメータ更新

学習によりモデルが最適化されたので、それを使って計算を行い正しい結果が得られるか確認します。

xt = Variable(learn_data, volatile='on')
yy = model.fwd(xt)

ans = yy.data
nrow, ncol = ans.shape
ok = 0
ans_word=['posi', 'nega']
for i in range(nrow):
    cls = np.argmax(ans[i,:])
    print(ans_word[cls])
    if learn_ans[i][cls] == 1.0:
        ok += 1

print (ok, "/", nrow, " = ", (ok * 1.0)/nrow)

学習をしたあとはモデル、辞書を保存します。

# 学習データの保存

# モデルの保存
# Save the model and the optimizer
print('save the model')
serializers.save_npz('sentiment.model', model)
print('save the optimizer')
serializers.save_npz('sentiment.state', optimizer)

# 辞書の保存
with open(  'vocab.dump', "w") as vocab_f:
    json.dump(str(word_vocab), vocab_f)

保存した辞書を読み込んでみます。

# 学習データの読み込み
load_vocab = {}
with open(  'vocab.dump', "r") as vocab_f:
    for data in json.load(vocab_f)[1:-1].split(","):
        pare_data = data.replace("'", "").split(":")
        load_vocab[pare_data[0].strip()] = pare_data[1].strip()

vocab_count = len(load_vocab)
class_count = 2
# load the model and the optimizer
load_model = MyRogistic(vocab_count, class_count)
load_optimizer = optimizers.Adam()
load_optimizer.setup(load_model)

print('load the model')
serializers.load_npz('sentiment.model',  load_model)
print('load the optimizer')
serializers.load_npz('sentiment.state', load_optimizer)

きちんと読み込まれているか試しに判定してみます。

# テストデータ
test_text1 = """今までにない臨場感ですごい!効果ですけど値段に見合うだけの価値があります。"""

test_text2=""" 評価は高いようですが簡単に手を出せる価格ではありません。これを買うくらいなら別のVR製品を買ってもおまけが来ます。
正直信者が騒いでいるだけに思います。
"""

test_doc = [
    word_segmenter_ja(test_text1),
    word_segmenter_ja(test_text2)
]

test_data = np.zeros(vocab_count *2 ).reshape(2, vocab_count).astype(np.float32)
for i in range(len(test_doc)):
    for word in test_doc[i]:
        if word in load_vocab:
            test_data[i,np.int(load_vocab[word])] += 1.0

xt = Variable(test_data, volatile='on')
yy = load_model.fwd(xt)

ans = yy.data
nrow, ncol = ans.shape
ans_word=['posi', 'nega']
for i in range(nrow):
    cls = np.argmax(ans[i,:])
    print(ans_word[cls])

chainerなら基本的なことなら簡単に試せそうです。