Cytoscape.jsを試してみた

Cytoscape.jsとは

もともとはクライアントアプリとして欧米の研究機関によって開発されているオープンソースのネットワーク可視化ソフトウェアプラットフォームであるCytoscapeが10年ほど前に作られ現在も継続して開発が行われています。Cytoscapeはデータの読み込み、分析、可視化といった一連の処理を行うことができますが、分析は別のソフト例えばRなどを使い、それから分析結果をエクスポートしてCytoscapeで可視化と言った使われ方もあるようです。クライアントアプリでは分析結果の共有に不便であるためカナダのトロント大学Cytoscape.jsのを開発を始めたようです。
以下のデモを見るのが一番わかりやすいと思います。 js.cytoscape.org

Cytoscapeでの処理フローについて

下記URLのCytoscapeの開発者の記事によりますと以下のような処理フローになるようです。
http://qiita.com/keiono/items/0db6495c4d8bc215de67

    ネットワークデータを読み込む
    アトリビュート(データテーブル)を読み込む
        ソーシャルネットワークの例で言えば
            人の名前、年齢、職業、性別などがノードアトリビュート
            婚姻関係、友人関係、メールの交換された数などといった関係性に関する情報がエッジアトリビュート
    統合されたデータを使って、統計的な解析やフィルタリングなどを行い分析する
    自動レイアウト機能を使って、ノードを見やすく配置する
    ノード、エッジに付随する各種データに基づきマッピングを設定し、描画を行う
    PDFやJavaScriptウィジェットに出力する

Cytoscape.jsを試してみる

では実際にCytoscape.jsを試しに動かしてみたいと思います。
ここのチュートリアルを試してます。

1.ソース取得
ソースををgithubからダウンロードしておきます。
https://github.com/cytoscape/cytoscape.js

2.html作成
以下のhtmlを記述します。srcのパスはダウンロードしたCytoscape.jsのdist/cytoscape.min.jsを参照するように修正しておいてください。

<!doctype html>

<html>

<head>
    <title>Tutorial 1: Getting Started</title>
    <script src="cytoscape.js"></script>
</head>

<style>
    #cy {
        width: 100%;
        height: 100%;
        position: absolute;
        top: 0px;
        left: 0px;
    }
</style>

<body>
    <div id="cy"></div>
</body>
</html>

divタグに対してcanvasを埋め込んでグラフの描画を行いまして、cssで表示領域を設定しています。

3.グラフインスタンスの作成

var cy = cytoscape({
  container: document.getElementById('cy'),
  elements: [
    { data: { id: 'a' } },
    { data: { id: 'b' } },
    {
      data: {
        id: 'ab',
        source: 'a',
        target: 'b'
      }
    }]
});

上記スクリプトで描画するノードとエッジを指定しています。data: { id: ‘a’ } 、data: { id: ‘b’ } とありますので2つのノードが描画されます。それから、data: {id: ‘ab’,source: ‘a’,target: ‘b’}とありますので2つのノードを結んだエッジも表示されます。
先ほど作成したhtmlにスクリプトを記述して開いてみるとグラフが表示されたことが確認できます。cytoscape.jsには自動レイアウト機能がありますのでウィンドウのサイズを変更して開き直すとノードが縦に並んだり、横に並んだりして自動でレイアウトして描画されることが確認できます。

4.グラフの描画設定 3で描画したスクリプトのelementsに続けて以下のstyleを追記してください。

style: [
    {
        selector: 'node',
        style: {
            shape: 'hexagon',
            'background-color': 'red',
            label: 'data(id)'
        }
    }]

それから開き直すとノードが赤色六角形のID付きで表示されることが確認できたかと思います。
styleについて他にどんな設定があるかは以下を見たら良いと思います。
https://github.com/cytoscape/cytoscape.js/blob/master/documentation/md/style.md

5.ノードの配置設定
ノードをどのように配置するかはlayoutで指定できるようになっています。例えばノードを円状に並べて描画したい場合は、以下を追記したら行えます。

cy.layout({
    name: 'circle'
});

円状に並べたのをわかりやすくするためhtmlを編集しノードを増やしてみます。

<!doctype html>

<html>

<head>
    <title>Tutorial 1: Getting Started</title>
    <script src="../cytoscape.js/dist/cytoscape.min.js"></script>
</head>

<style>
    #cy {
        width: 100%;
        height: 100%;
        position: absolute;
        top: 0px;
        left: 0px;
    }
</style>

<body>
    <div id="cy"></div>
    <script>
      var cy = cytoscape({
        container: document.getElementById('cy'),
        elements: [
          { data: { id: 'a' } },
          { data: { id: 'b' } },
          {
            data: {
              id: 'ab',
              source: 'a',
              target: 'b'
            }
          }],
        style: [
        {
            selector: 'node',
            style: {
                shape: 'hexagon',
                'background-color': 'red',
                label: 'data(id)'
            }
        }]
      });
      for (var i = 0; i < 10; i++) {
          cy.add({
              data: { id: 'node' + i }
              }
          );
          var source = 'node' + i;
          cy.add({
              data: {
                  id: 'edge' + i,
                  source: source,
                  target: (i % 2 == 0 ? 'a' : 'b')
              }
          });
      }
      cy.layout({
        name: 'circle'
      });
    </script>
</body>
</html>

これで開きなおしてみると以下のように円状に並んで表示されることが確認できます。 f:id:steavevaivai:20170325065549p:plain

レイアウトの描画設定は自分で行うこともできここにまとめられているのですがcircleに配置するのであれば、以下の設定でも同様となっております。

var options = {
  name: 'circle',

  fit: true, // whether to fit the viewport to the graph
  padding: 30, // the padding on fit
  boundingBox: undefined, // constrain layout bounds; { x1, y1, x2, y2 } or { x1, y1, w, h }
  avoidOverlap: true, // prevents node overlap, may overflow boundingBox and radius if not enough space
  radius: undefined, // the radius of the circle
  startAngle: 3 / 2 * Math.PI, // where nodes start in radians
  sweep: undefined, // how many radians should be between the first and last node (defaults to full circle)
  clockwise: true, // whether the layout should go clockwise (true) or counterclockwise/anticlockwise (false)
  sort: undefined, // a sorting function to order the nodes; e.g. function(a, b){ return a.data('weight') - b.data('weight') }
  animate: false, // whether to transition the node positions
  animationDuration: 500, // duration of animation in ms if enabled
  animationEasing: undefined, // easing of animation if enabled
  ready: undefined, // callback on layoutready
  stop: undefined // callback on layoutstop
};

cy.layout( options );

6.ノードごとでスタイルを変えてみる
ノードごとでスタイルを変更したい場合があると思うのですが、その場合はノードに付与したIDを指定してstyle属性に設定を追加してあげれば行えます。
例えばID=node1に対してスタイルを設定したい場合は以下のようになります。

{
    selector: '#node1',
    style: {
        'background-image': 'https://farm8.staticflickr.com/7272/7633179468_3e19e45a0c_b.jpg',
        'height': 80,
        'width': 80,
        'background-fit': 'cover',
        'border-color': '#000',
        'border-width': 3,
        'border-opacity': 0.5
}

結果、このようになりスタイルが適用されているのが確認できます。 f:id:steavevaivai:20170325065606p:plain

触ってみた感想として表示するデータに合わせて柔軟にレイアウトを調整するというよりかは、事前に表示するデータを想定した上でそれに合わせてレイアウトをどう調整するのかが重要そうな気がしました。他の分析ソフトからエッジとノードあとアトリビュートをエクスポートできたら分析結果の共有に便利そうです。

Rでネットワーク分析をしてみる

ちょっと興味が湧いたのでigraphを使ってネットワーク分析を行ってみたいと思います。

igraph

igraphとは?
ネットワーク解析周りの関数が多数登録されており、簡単にネットワーク解析を行うことができる。最近ではグラフデータベースのneo4jなどでネットワーク分析をするのが流行ってそうですが、簡単に試す場合はRとかapach sparkを使う方が楽そうな気がします。

インストール 

install.packages(“igraph”)

バージョン確認

packageVersion(“igraph”)

パッケージの読みこみ

library(igraph)

サンプルデータで動かしてみる

以下のサイトから"Zachary’s karate club"をダウンロードする。データの中身は大学の空手部の交友関係情報らしいです。
http://www-personal.umich.edu/~mejn/netdata/

igraphを使う

install.packages(“igraph”)
library(“igraph”)

データを読み込む

“Zachary’s karate club"のデータをダウンロード後解答したkarate.gmlがグラフの情報を表すデータになります。試しに開いてみると以下のようにノードとエッジの情報を持った有向グラフを表していることがわかります。

Creator "Mark Newman on Fri Jul 21 12:39:27 2006"
graph
[
  node
  [
    id 1
  ]
  node
  [
    id 2
  ]

~~~

edge
[
  source 34
  target 32
]
edge
[
  source 34
  target 33
]
]

今回はノードにラベルの情報が含まれていないため、グラフ表示する際に名前の情報は含まれません。ノードにラベルを含む場合は以下のようにnodeの要素にlabelの項目を追加する形式で保存されます。

Creator "Mark Newman on Sat Jul 22 05:32:16 2006"
graph
[
  directed 0
  node
  [
    id 0
    label "BrighamYoung"
    value 7
  ]
  node
  [
    id 1
    label "FloridaState"
    value 0
  ]

~~~

ちなみにsourceとtargetが同一のエッジが複数あった場合強いつながりとして処理してくれてそうだった。
gml形式のデータについて詳しくは下記参照
http://graphml.graphdrawing.org/

今回はすでにノードとエッジの情報は分析済みのgml形式のデータでRStudioから表示させる動きを試してみたいと思います。 グラフデータの読み込みはread.graphを使います。

karate_gh <- read.graph(“./karate.gml”, format=“gml”)

データの可視化

plot関数で読み込んだグラフを可視化します。

plot(karate_gh, vertex.size=4, edge.arrow.size=0.2, layout=layout.fruchterman.reingold)

コミュニティ分析

leading.eigenvector.community関数によりグラフのデータからコミュニティの情報を得ます。得たデータをplot関数に利用することでコミュニティごとで色分けした表示が行えます。

karate_com <- leading.eigenvector.community(karate_gh)
plot(karate_gh, vertex.size=4, edge.arrow.size=0.2, vertex.color=karate_com$membership, layout=layout.fruchterman.reingold)

中心性解析

page.rank関数によりどれだけコミュニティの中心に位置していかを数値化できるので、これをプロット時のノードのサイズとして表すことで中心性解析の可視化が行える。

karate_pr <- page.rank(karate_gh, directed=TRUE)
plot(karate_gh, vertex.size=karate_pr$vector*200, vertex.color=karate_com$membership, edge.arrow.size=0.2, layout=layout.fruchterman.reingold)

f:id:steavevaivai:20170322064058p:plain

とりあえずノードとエッジがはっきりしたgml形式のデータがあればRで簡単に可視化できることがわかった。ブラウザ上でデータをみたい場合は、以下のossが使えそう。
js.cytoscape.org

Chainerによるロジスティック回帰

PythonディープラーニングフレームワークであるChainerを使って簡単なセンチメント分析を行ってみたいと思います。

必要なモジュールのインポート

# import chainer module
import numpy as np
import chainer
from chainer import cuda, Function, gradient_check, Variable
from chainer import optimizers, serializers, utils
from chainer import Link, Chain, ChainList
import chainer.functions as F
import chainer.links as L

# import mecab module
import re
import MeCab
import json

それからロジスティック回帰用のモデルを定義します。レイヤーは一つで入力が単語の種類数で、出力がクラスの数で今回はポジティブ、ネガティブの2つになるように引数で渡します。単語の種類数は学習データを読み無彩にカウントします。

# ロジスティック回帰を行います
class MyRogistic(Chain):
    # パラメータ数に語彙の数, 分類結果数を受け取ります。
    def __init__(self, vocab_count, class_count):
        # モデルを定義しています。中間層もない1層ネットワークです。
        super(MyRogistic, self).__init__(
            l1=L.Linear(vocab_count,class_count),
        )

    def __call__(self,x,y):
        return F.mean_squared_error(self.fwd(x), y)

    # 誤差関数はソフトマックスです
    def fwd(self,x):
        return F.softmax(self.l1(x))

次に入力データから単語の頻出数をカウントできるようにするためのメソッドを準備しておきます。判定に関係なさそうな単語は除外するようにします。

# 形態素解析を行うための関数を定義しておきます
def _mecab_parse_feat(feat):
    return dict(zip(_mecab_feat_labels, feat.split(',')))


def _mecab_node2seq(node, decode_surface=True, feat_dict=True,
                    mecab_encoding='utf-8'):
    # MeCab.Nodeはattributeを変更できない。
    while node:
        error_count = 0
        try:
            if decode_surface:
                node._surface = node.surface
            if feat_dict:  # 品詞の情報をdictで保存
                node.feat_dict = _mecab_parse_feat(
                node.feature
                )
            yield node
        except:
            error_count += 1
        node = node.next

#回帰の邪魔になるストップワードは除外するようにします。
def is_stopword(n):  # <- mecab node
    if len(n._surface) == 0:
        return True
    elif re.search(u'^[\s!-@\[-`\{-~ 、-〜!-@[-`]+$', n._surface):
        return True
    elif re.search(u'^(接尾|非自立)', n.feat_dict['cat1']):
        return True
    elif u'サ変・スル' == n.feat_dict['conj'] or u'ある' == n.feat_dict['orig']:
        return True
    elif re.search(u'^(名詞|動詞|形容詞)', n.feat_dict['pos']):
        return False
    else:
        return True


def not_stopword(n):  # <- mecab node
    return not is_stopword(n)


def node2word(n):  # <- mecab node
    return n._surface


def node2norm_word(n):  # mecab node
    if n.feat_dict['orig'] != '*':
        return n.feat_dict['orig']
    else:
        return n._surface


def word_segmenter_ja(sent, node_filter=not_stopword,
                      node2word=node2norm_word, mecab_encoding='utf-8'):
    if type(sent) == "unicode":
        sent = sent.encode(mecab_encoding)

    nodes = list(
        _mecab_node2seq(_mecab.parseToNode(sent), mecab_encoding=mecab_encoding)
    )
    if node_filter:
        nodes = [n for n in nodes if node_filter(n)]
    words = [node2word(n) for n in nodes]

    return words

日本語の構文解析に使うmecabを初期化します。辞書にはneologdを使用しています。

# 形態素解析の辞書にneologdを使用します
tagger =  MeCab.Tagger(' -Ochasen -d /usr/local/lib/mecab/dic/mecab-ipadic-neologd')
_mecab = MeCab.Tagger()
# 品詞,品詞細分類1,品詞細分類2,品詞細分類3,活用形,活用型,原形,読み,発音
_mecab_feat_labels = 'pos cat1 cat2 cat3 conj conj_t orig read pron'.split(' ')

学習データを初期化します。本番ではDBに保存されているデータをここで読み込むようにします。

# 学習用データ準備
text1 = """今までにない臨場感ですごい!効果ですけど値段に見合うだけの価値があります。"""

text2 = """ 昔見た近未来に一つ近づいたと思います。"""

text3 = """ 評価は高いようですが簡単に手を出せる価格ではありません。これを買うくらいなら別のVR製品を買ってもおまけが来ます。
正直信者が騒いでいるだけに思います。
"""

text4 = """値段は下がるはずなので今は待ちだと思います。遊ぶにもソフトが少ないですし。
"""

learn_docs = [
    word_segmenter_ja(text1),
    word_segmenter_ja(text2),
    word_segmenter_ja(text3),
    word_segmenter_ja(text4)
]

# 学習データをポジティブ0, ネガティブ1の何方かに分けるようにします
ans_index = [0,0,1,1]

# 語彙の数を求めるのに使う
def word_vocabularies(data):
    vocabularies = {}
    word_id = 0
    for doc in data:
        for word in doc:
            if word not in vocabularies:
                vocabularies[word] = word_id
                word_id += 1
    return vocabularies

# データ数
data_num = len(learn_docs)
# 分類するクラス数 今回はポジティブ、ネガティブの2値分類です
class_num = 2
word_vocab = word_vocabularies(learn_docs)
vocab_count = len(word_vocab)
learn_data = np.zeros(vocab_count * data_num).reshape(data_num, vocab_count).astype(np.float32)

# 学習時に使う入力データを初期化します
for i in range(len(learn_docs)):
    for word in learn_docs[i]:
        learn_data[i, word_vocab[word]] += 1.0

# 学習時に使う答えのデータを初期化します
learn_ans = np.zeros(class_num * data_num).reshape(data_num,class_num).astype(np.float32)
for i in range(data_num):
    learn_ans[i,np.int(ans_index[i])] = 1.0

ロジスティック回帰のモデルを初期化し、学習を行います。

# モデルを初期化します。
model = MyRogistic(vocab_count, class_num)
# パラメータの最適化アルゴリズムにAdamを使います
optimizer = optimizers.Adam()
optimizer.setup(model)

#学習します
# 学習データをランダムにサンプリングするために使います。
n = 4
# バッチサイズです。今回は学習データが4つしかないのでほとんど意味ないです
bs = 25
for j in range(5000):
    # インデックスをランダムにします
    sffindx = np.random.permutation(n)
    accum_loss = None
    for i in range(0, n, bs):
        # ミニバッチを取得
        x = Variable(learn_data[sffindx[i:(i+bs) if (i+bs) < n else n]])
        y = Variable(learn_ans[sffindx[i:(i+bs) if (i+bs) < n else n]])
        # 勾配を初期化
        model.zerograds()
        # 順方向に計算して誤差を取得
        loss = model(x,y)
        # 逆伝搬を行います
        loss.backward()
        # パラメータを更新します。今回はAdamを使っています。
        optimizer.update()
        # accum_loss = loss if accum_loss is None else accum_loss + loss バッチサイズで誤差を累計
    # accum_loss.backward() 逆伝搬
    # optimizer.update() パラメータ更新

学習によりモデルが最適化されたので、それを使って計算を行い正しい結果が得られるか確認します。(今回は学習データが4つなのであんまり意味ないと思いますが)

# パラメータ更新後に回帰を行ってみる
xt = Variable(learn_data, volatile='on')
# 準伝搬での計算結果を取得
yy = model.fwd(xt)

# 答え合わせ用のデータを初期化
ans = yy.data
nrow, ncol = ans.shape
ok = 0
ans_word=['posi', 'nega']
for i in range(nrow):
    # 分類結果の取得
    cls = np.argmax(ans[i,:])
    print(ans_word[cls])
    if learn_ans[i][cls] == 1.0:
        ok += 1
#正解数の表示
print (ok, "/", nrow, " = ", (ok * 1.0)/nrow)

学習をしたあとはモデル、辞書を保存します。

# 学習データの保存

# モデルの保存
# Save the model and the optimizer
print('save the model')
serializers.save_npz('sentiment.model', model)
print('save the optimizer')
serializers.save_npz('sentiment.state', optimizer)

# 辞書の保存
with open(  'vocab.dump', "w") as vocab_f:
    json.dump(str(word_vocab), vocab_f)

保存した辞書を読み込んでみます。

# 学習データの読み込み
load_vocab = {}
with open(  'vocab.dump', "r") as vocab_f:
    for data in json.load(vocab_f)[1:-1].split(","):
        pare_data = data.replace("'", "").split(":")
        load_vocab[pare_data[0].strip()] = pare_data[1].strip()

vocab_count = len(load_vocab)
class_count = 2
# load the model and the optimizer
load_model = MyRogistic(vocab_count, class_count)
load_optimizer = optimizers.Adam()
load_optimizer.setup(load_model)

print('load the model')
serializers.load_npz('sentiment.model',  load_model)
print('load the optimizer')
serializers.load_npz('sentiment.state', load_optimizer)

きちんと読み込まれているか試しに判定してみます。

# テストデータ
test_text1 = """今までにない臨場感ですごい!効果ですけど値段に見合うだけの価値があります。"""

test_text2=""" 評価は高いようですが簡単に手を出せる価格ではありません。これを買うくらいなら別のVR製品を買ってもおまけが来ます。
正直信者が騒いでいるだけに思います。
"""

test_doc = [
    word_segmenter_ja(test_text1),
    word_segmenter_ja(test_text2)
]

test_data = np.zeros(vocab_count *2 ).reshape(2, vocab_count).astype(np.float32)
for i in range(len(test_doc)):
    for word in test_doc[i]:
        if word in load_vocab:
            test_data[i,np.int(load_vocab[word])] += 1.0

xt = Variable(test_data, volatile='on')
yy = load_model.fwd(xt)

ans = yy.data
nrow, ncol = ans.shape
ans_word=['posi', 'nega']
for i in range(nrow):
    cls = np.argmax(ans[i,:])
    print(ans_word[cls])

chainerなら基本的なことは簡単に試せそうです。

ApacheSparkの基本的なデータ操作

データ操作

Spark2.1で動作確認

build.sbt

IDEAから実行できるようにbuild.sbtに以下を追加。対象のライブラリが存在しない場合はmaven_centralで確認する。

val sparkVersion = "2.1.0"

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.11" % sparkVersion % "compile",
  "org.apache.spark" % "spark-sql_2.11" % sparkVersion % "compile"
)

RDD

実行時の設定の読み込み

val conf = new SparkConf().setAppName("WordCountTop3").setMaster("local[*]")
val sc = new SparkContext(conf)

ハイフン区切りで単語出現数のカウント

val filePath = "build.sbt"
val wordAndCountRDD = sc.textFile(filePath)
  .flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)
wordAndCountRDD.collect.foreach(println)

単語出現数をソートして上位3件を表示

val filePath = "build.sbt"
val wordAndCountRDD = sc.textFile(filePath)
  .flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)

wordAndCountRDD.sortBy( _._2.toInt, ascending = false).take(3).foreach(println)

DataFrame

設定の読み込み

val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val sc = new SparkContext(conf)
// spark2からはSparkSessioのほうが推奨だけどエラーで使えなかった
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Spark2.1ではSQLContextではなくSparkSessionの方を推奨しているけど、エラーが発生して利用できなかった。とりあえずSQLContextで試してみる。

RDDでファイルを読み込む

val dessertRDD = sc.textFile("data/dessert-menu.csv")
  .map{ record =>
  val splitRecord = record.split(",")
  val menuId = splitRecord(0)
  val name = splitRecord(1)
  val price = splitRecord(2).toInt
  val kcal = splitRecord(3).toInt
  Dessert(menuId, name, price, kcal)
}

RDDからDataFrameへ変換

// RDD → DataFrame
val dessertDF:DataFrame = sqlContext.createDataFrame(dessertRDD)

スキーマ定義の表示

println("show schema")
println(dessertDF.printSchema)

DataFrameに対してクエリを投げる。生成ずみのDataFrameに対して直接クエリを投げるのではなくワークテーブルを作成して、そちらに対してクエリを実行する。ATAN2などの組み込み関数については公式のドキュメントを確認する。

dessertDF.createOrReplaceTempView("dessert")
val numOver300KcalDF = sqlContext.sql(
  "select count(*) as num_of_over_300Kcal FROM dessert where kcal >= 260"
)
println(numOver300KcalDF.show)
println(sqlContext.sql("select atan2(1, 3) as `ATAN2(1, 3)`").show)

DataFrameをjoinしてクエリを投げる。

val dessertOrderRDD = sc.textFile("data/dessert-order.csv")
  .map{record =>
    val splitRecord = record.split(",")
    val sId = splitRecord(0)
    val menuId = splitRecord(1)
    val num = splitRecord(2).toInt
    DessertOrder(sId, menuId, num)
  }
val dessertOrderDF:DataFrame = sqlContext.createDataFrame(dessertOrderRDD)
dessertOrderDF.createOrReplaceTempView("desert_order")

println(sqlContext.sql(
  """
    |select do.sId
    |  , d.name
    |  , do.num * d.price as amout_per_menu_per_slip
    |from desert d
    |join desert_order do on d.menuId = do.menuId
  """.stripMargin).show

Apache Sparkを触ってみた

Apache Sparkとは?

Hadoopと同じく分散処理のフレームワークHadoopではMapReduceと言って複数マシンで分散処理を行ってから結果をストレージに書き出す。1回の処理では終わらない場合はデータの処理フローを形成することになり、よみ出し→分散処理→書き込みを繰り返す動きをする。MapReduceでは処理の中間結果が常にストレージに書き出されるためデータ量が大きくなっても動作し、障害からの回復も容易であると言ったメリットがある。
しかしこれではあるデータの部分集合に対し複数回で処理する場合、都度すべてのデータをストレージに書き込む処理が行われるため必要な計算コストが大きくなってしまう。
Sparkでは連続する処理の中で無駄なディスクやネットワークのI/Oを起こさないように処理することでMapReduceの問題に対処している。Sparkの高速化が期待できるのは、複数回の反復処理や連続する変換処理になります。

Sparkのデータ構造

ApacheSparkのデータ処理には「RDD」と呼ばれるデータ構造を利用する。RDDは複数のマシンから構成されるクラスタ上での分散処理を前提として設計されており、内部的にはパーティションという塊に分割される。RDDパーティションごとに複数のマシンで処理することで分散処理が行われる。

Sparkの分散処理環境

RDDのデータ構造を分散処理するためのクラスタ管理システムには以下のようなものがある。
* YARN
Hadoopクラスタ管理システム。Hadoop分散ファイルシステムであるHDFSで利用するとI/Oが効率化されるらしい。 * Mesos * Spark Standalone Sparkに同梱されているクラスタ管理システム。別途クラスタ管理システムを用意する必要がなく手軽に利用できる。

RDDによるタスク処理

RDDには以下の情報が含まれている
* RDDの元になるデータの情報、または変換前のRDD
* RDD成の元になるデータのロード方法、変換方法
* 変換後のRDDの要素の型
* 変換後のRDDパーティション
* RDDが永続化されているかどうか

RDD内に含まれる上記の情報と遅延評価の性質により効率的なタスクのスケジューリングが可能となっているらしいけど詳しくは別でドキュメントを見たほうがよさそう。

http://kimutansk.hatenablog.com/entry/20130902/1378142510
https://www2.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf

Sparkのマシン構成

ホスト名 役割 説明
spark-client クライアント アプリケーションを起動するクライアント
spark-master マスターノード クラスタ内のリソースを管理
spark-worker00 ワーカーノード 働く

Sparkインストール

とりあえずmacにインストールしてみる
以下のページからsparkをダウンロードして解凍後binにパスを通すととりあえず動きを試すことができる http://spark.apache.org/downloads.html

パスを通した後は以下のコマンドが実行できたらApacheSparkはインストールされている

spark-shell –version

IDEAでの開発

  1. ideaでSBTプロジェクトを作成
    使用するscalaのバージョンは公式ページでapiをサポートしているバージョンを事前に確認しておく(2017/3/20時点ではscala2.12からは利用できなかった)
    http://spark.apache.org/docs/latest/
  2. 実行可能jarが生成できるようにsbt-assemblyをプラグインに追加。追加方法は以下のgithubのページを参考にする。
    https://github.com/sbt/sbt-assembly
    plugins.sbtに以下を追加する
    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")
    org.jboss.interceptorがunresolvedになったらplugins.sbtにさらに以下を追記
    resolvers += "JBoss" at "https://repository.jboss.org"
  3. build.sbtのlibraryDependenciesにspark-coreを追加する。
    バージョンには気をつける
    libraryDependencies ++= Seq(
      "org.apache.spark" % "spark-core_2.11" % "2.1.0"
    )
  4. abt-assemblyのオプションをbuild.sbtに追加
    詳しくは以下のペジを参照。ファイル読み込み時に複数ファイルが見つかった時どうするかのために必要
    https://github.com/sbt/sbt-assembly
    assemblyMergeStrategy in assembly := {
      case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first
      case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first
      case "application.conf"                            => MergeStrategy.concat
      case "unwanted.txt"                                => MergeStrategy.discard
      case x =>
        val oldStrategy = (assemblyMergeStrategy in assembly).value
        oldStrategy(x)
    }
    ついでに出力するjarファイル名も設定しておく
    assemblyJarName in assembly := "something.jar"
  5. 試しにjar出力
    ApacheSparkで実行可能なjarを試しに出力しておく > sbt assembly で多分失敗するので、その場合はlibraryDependenciesをprovidedにしておく。
      "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided"
    それから再度sbt assemlbyを実行して所定のパスにjarが生成されることを確認。ただprovidedの場合有効なスコープがコンパイル時とテスト時になる。今回はIDEAから直接実行して簡単に動きを見るだけなので一旦以下のように変更しておく。
      "org.apache.spark" % "spark-core_2.11" % "2.1.0" % "compile"

簡単なスクリプトをIDEAから動かしてみる

それではSparkを実行するsbtプロジェクトが準備できたので試しにサンプルコードを動かしてみます。 以下のページにある単語の出現数をカウントするscalaのプログラムを動かしてみたいと思います。 http://spark.apache.org/examples.html

import org.apache.spark.{SparkConf, SparkContext}

object spark_shell {

  def main(args:Array[String]): Unit ={

    val conf = new SparkConf().setAppName("WordCount")
    val sc = new SparkContext(conf)

    try{
      val filePath = "build.sbt"
      val wordAndCountRDD = sc.textFile(filePath)
        .flatMap(line => line.split(" "))
        .map(word => (word, 1))
        .reduceByKey(_ + _)

      wordAndCountRDD.collect.foreach(println)
    }
    sc.stop()
  }
}

まあ、単純に読み込んだファイルをスペース区切りで分割してwordごとのmapでカウントアップするだけです。 これで実行してみると以下のエラーが発生すると思います。
- SparkException: A master URL must be set in your configuration

このエラーについてですが、sparkのタスクを実行する場合spark-clientからspark-masterを指定してプログラム実行の流れになるのですが、IDEAでそのまま実行するだけではmaster未指定のため上記エラーが発生すると思われます。本番環境のクラスタリング構成で動かす場合はpark-submitのオプションで指定するのかと思います。 http://spark.apache.org/docs/latest/submitting-applications.html

今回は手軽に動きを試したいのでプログラム中のSparkConfを以下のように変更しmasterとして自分自身を指定して動きを見たいと思います。

val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")

それから実行してみると環境情報等読み込んだ後に以下のような出力結果が得られると思います。

(oldStrategy,1)
(x,1)
(endsWith,1)
("2.11.8",1)
(assembly).value,1)
(scalaVersion,1)
(,97)
(@,2)
("unwanted.txt",1)
(libraryDependencies,1)
("spark-core_2.11",1)
("2.9.7",1)
("joda-time",2)
(PathList(ps,1)
(_*),2)
(assembly,3)

Scalaで数式をパースして整形表示

数式パーサ

Scalaのコップ本を一通り読んだので振り返りも兼ねて数式をパースして計算、整形するDSLを実装したいと思います。全部自力で作れたら良いのですが、ソースコードはほとんど(http://blog.j5ik2o.me/entry/20101126/1290763057) とコップ本から拝借しています。    

パースした数式は探索して評価を行うので、式を表すExpressionトレイトと訪問するExpressionVisitorトレイトを用意します。

// 式を訪問するビジター
trait ExpressionVisitor {
  def visit(e:Expression):BigDecimal
}

// 式を表すトレイト
trait Expression {
  def accept(visitor:ExpressionVisitor):BigDecimal = {
    visitor.visit(this)
  }
}

それから各数式を表すのに使うケースクラスを定義しておきます。

// 各式を表すケースクラス。これを使ってAST(抽象構文木)を作ります。
case class Value(digit:BigDecimal) extends Expression
case class Add(expr1:Expression, expr2:Expression) extends Expression
case class Sub(expr1:Expression, expr2:Expression) extends Expression
case class Plus(expr:Expression) extends Expression
case class Minus(expr:Expression) extends Expression
// カッコで囲んでいる部分
case class Parenthesized(expr:Expression) extends Expression
case class Multiply(expr1:Expression, expr2:Expression) extends Expression
case class Divide(expr1:Expression, expr2:Expression) extends Expression
// 数式と評価結果を表す
case class ExecResult(exp:Expression, result: Value) extends Expression

RegexParsersを使って数式をパースします。構文解析にはASTというものを使います。数式のクラスは入れ子構造になるのですが、ASTで解析する時はまず+,-次に*,/それからカッコというように優先度の低いものを先に評価することで正しくパースが行えるようです。この辺りは既にあるものを見るならなんとなくわかるけど、自分で一から考えるのは難しそう

// 式のパーサ
class ExprParser extends RegexParsers {

  def parse(data:String) = parseAll(expression, data)

  // expression ::= term { "+" term | "-" term }
  def expression : Parser[Expression] = term~rep("+"~term | "-"~term) ^^ {
    case opr1~lists => {
      var operand1 = opr1
      lists.foreach {
        l => l match {
          case "+"~f => { operand1 = Add(operand1, f) }
          case "-"~f => { operand1 = Sub(operand1, f) }
        }
      }
      operand1
    }
  }

  // term ::= factor { "*" factor | "/" factor }
  def term : Parser[Expression] = factor~rep("*"~factor | "/"~factor) ^^ {
    case opr1~lists => {
      var operand1 = opr1
      lists.foreach {
        l => l match {
          case "*"~f => { operand1 = Multiply(operand1, f) }
          case "/"~f => { operand1 = Divide(operand1, f) }
        }
      }
      operand1
    }
  }

  // factor ::= unary
  def factor : Parser[Expression] = unary

  // unary :: = "+" unary | "-" unary | primary
  def unary : Parser[Expression] =
    ("+"|"-")~unary ^^ {
      case "+"~u => Plus(u)
      case "-"~u => Minus(u)
    }| primary

  // primary ::= "(" expression ")" | value
  def primary : Parser[Expression] =
    "("~expression~")" ^^ {
      case lbrace~expr~rbrace => Parenthesized(expr)
    }|value

  // value ::= "[0-9]+"
  // floatingPointNumberは浮動小数点を表す正規表現のParser
  def value : Parser[Expression] =
  fpn ^^ {
    n => Value(BigDecimal(n, MathContext.DECIMAL32))
  }

  def fpn: Parser[String] =
    """-?(\d+(\.\d*)?|\d*\.\d+)([eE][+-]?\d+)?[fFdD]?""".r
}

それから、パース後に数式の答えを出すのは以下のvisitorパターンを使います。

class Evaluator extends ExpressionVisitor {
  // ケースクラスの抽出(unapply)を使って内包されている式を取り出してさらに訪問する
  override def visit(e:Expression) : BigDecimal = e match {
    case Value(digit) => digit
    case Add(l,r) => l.accept(this) + r.accept(this)
    case Sub(l,r) => l.accept(this) - r.accept(this)
    case Multiply(l,r) => l.accept(this) * r.accept(this)
    case Divide(l,r) => l.accept(this) / r.accept(this)
    case Minus(e) => e.accept(this) * -1
    case Plus(e) => e.accept(this)
    case Parenthesized(e) => e.accept(this)
  }
}

整形表示なしで式を計算するだけなら、以下のようになります。

val parser = new ExprParser
val parseResult = parser.parse("3/2 + 5 * (24 + 7) /2")

if ( parseResult.successful ){
  parseResult.get.accept(new Evaluator)
}else{
  ""
}

次に、数式の整形表示についてですが、表示用の情報はElementオブジェクトを拡張したクラスを使用して使います。

object Element{
  private class ArrayElement(val contents: Array[String]) extends Element
  private class LineElement(s: String) extends Element{
    val contents = Array(s)
    override def width = s.length
    override def height = 1
  }
  private class UniformElement(
                                ch:Char,
                                override val width: Int,
                                override val height:Int
                              )extends Element{
    private val line = ch.toString * width
    def contents = Array.fill(height)(line)
  }
  def elem(contents:Array[String]):Element =
    new ArrayElement(contents)
  def elem(chr: Char, width: Int, height: Int ): Element =
    new UniformElement(chr, width, height)
  def elem(line: String): Element =
    new LineElement(line)
}

表示用のクラスには数式の一部が格納されていて、その一部の数式同士の位置を調整するためのメソッド群をElementのabstractクラスに持たせておきます。分子を分母の上に表示するのにはここのクラスのメソッドを使用します。

abstract class Element {
  import util.Element.elem

  def contents: Array[String]
  def width: Int = contents(0).length
  def height: Int = contents.length

  def above(that: Element): Element = {
    val this1 = this widen that.width
    val that1 = that widen this.width
    assert(this1.width == that1.width)
    elem(this1.contents ++ that1.contents)
  }

  def beside(that: Element): Element = {
    val this1 = this heighten that.height
    val that1 = that heighten this.height
    elem(
      for((line1, line2) <- this1.contents zip that1.contents)
        yield  line1 + line2)
  }

  def widen(w:Int):Element =
    if(w<=width) this
    else{
      val left = elem(' ', (w-width)/2, height)
      val right = elem(' ', w - width - left.width, height)
      left beside this beside right
    }

  def heighten(h: Int): Element =
    if(h <= height)this
    else{
      val top = elem(' ', width, (h - height)/2)
      val bot = elem(' ', width, h - height - top.height)
      top above this above bot
    }
  override def toString = contents mkString "\n"
}

それからパースされた数式のクラスを表示用クラスに変換するExprFormatterクラスを作成します。Scalaのパターンマッチを使って入れ子構造のクラスを再帰的に評価しています。

class ExprFormatter {
  private val fractionPrecedence = -1

  private def format(e: Expression, enclPrec: Int): Element =
    e match {
      case Value(num) =>
        elem(" " + num.toString + " ")
      case Add(x, y) =>
        val l = format(x, fractionPrecedence)
        val op = elem("+")
        val r = format(y, fractionPrecedence)
        l beside op beside r
      case Sub(exp1, exp2) =>
        val l = format(exp1, fractionPrecedence)
        val op = elem("-")
        val r = format(exp2, fractionPrecedence)
        l beside op beside r
      case Plus(exp) =>
        val op = elem("+")
        val ex = format(exp, fractionPrecedence)
        op beside ex
      case Minus(exp) =>
        val op = elem("-")
        val ex = format(exp, fractionPrecedence)
        op beside ex
      case Parenthesized(exp) =>
        val l = elem("(")
        val ex = format(exp)
        val r = elem(")")
        l beside ex beside r
      case Multiply(exp1, exp2) =>
        val l = format(exp1, fractionPrecedence)
        val op = elem("x")
        val r = format(exp2, fractionPrecedence)
        l beside op beside r
      case Divide(exp1, exp2) =>
        val top = format(exp1, fractionPrecedence)
        val bot = format(exp2, fractionPrecedence)
        val line = elem('-', top.width max bot.width, 1)
        val frac = top above line above bot
        if (enclPrec != fractionPrecedence) frac
        else frac
      case ExecResult(exp, result) =>
        val e = format(exp, fractionPrecedence)
        val eq = elem("=")
        val r = format(result, fractionPrecedence)
        e beside eq beside r
    }
  def format(e: Expression): Element = format(e, 0)
}

最後に数式のパース,整形表示は以下のように利用します。

trait MathParser {
  def mathParse(input: String) = {
    val f: ExprFormatter = new ExprFormatter
    val parser = new ExprParser
    val parseResult = parser.parse(input)

    if ( parseResult.successful ){
      val evalResult = parseResult.get.accept(new Evaluator)
      f.format(ExecResult(parseResult.get, Value(evalResult)))
    }else{
      ""
    }
  }
}

これで以下のように入力を行うと

3/2 + 5 * (24 + 7) /2

こんな風に整形して出力されることが確認できます。

3   5 x( 24 + 7 )       
---+--------------= 79.0
2        2              

他の言語でDSLを実装したことがないので比較はできませんが、Scalaを使うことでASTで表した構造をそのままコードに落とし込むイメージがつきやすい気がしたのでパーサの構築の敷居が大分下がっているように思いました。

Postgreyについて

Postgrey

Postgreyとはpostfixを使用する時に初めて来る相手からは一定の待ち時間後に再送が来なければ受信を受け付けないといったもので、phpとかで大量のスパムメールを送りつけてきたのを受診させないためのものになっています。 インストールから使用するまでを簡単にまとめると以下のようになります。

インストール〜使用開始まで(CentOS7系で確認)

# yum install epel-release
# vi /etc/yum.repos.d/epel.repo

enabled=0

# yum –enablerepo=epel install postgrey
# vi /etc/sysconfig/postgrey

POSTGREY_OPTS="--inet=10023 --delay=1800" #メインサーバの設定にあわせる

○postgrey起動
# systemctl start postgrey
# systemctl enable postgrey

グレイリスティングの対象について

上記設定を行うと初めてくる相手からは以下のようにログが出力されるようになっています。

Sep 11 07:52:43 xxx postgrey[31898]: action=greylist, reason=new, client_name=xxx, client_address=xxx, sender=xxx, recipient=xxx

“action=greylist, reason=new"となっていますが、これは初めてメールを送ってきた相手のため一旦受診を拒否したといった内容になっております。ここでいう初めて送ってきた相手についてですが、最近まではメールサーバ間でのやり取りがあったかどうかだと思っていたのですが実際は違っていたようでして、それが確認できるログは以下のようになっています。

Sep 11 08:01:05 xxx postgrey[31898]: action=pass, reason=triplet found, delay=508, client_name=xxx, client_address=xxx, sender=xxx, recipient=xxx

これはPostgreyのデータベース内にtriplet(送信元サーバIP, 送信者アドレス、 受信者アドレスの組み合わせ)が存在したため、グレイリスティングにパスしたということを表しています。自分はこのログを見て気づいたのですがどうやら送信アドレス、受信者アドレス、送信サーバ毎で待ち時間が発生するようです。