Cytoscape.jsを試してみた
Cytoscape.jsとは
もともとはクライアントアプリとして欧米の研究機関によって開発されているオープンソースのネットワーク可視化ソフトウェアプラットフォームであるCytoscapeが10年ほど前に作られ現在も継続して開発が行われています。Cytoscapeはデータの読み込み、分析、可視化といった一連の処理を行うことができますが、分析は別のソフト例えばRなどを使い、それから分析結果をエクスポートしてCytoscapeで可視化と言った使われ方もあるようです。クライアントアプリでは分析結果の共有に不便であるためカナダのトロント大学Cytoscape.jsのを開発を始めたようです。
以下のデモを見るのが一番わかりやすいと思います。
js.cytoscape.org
Cytoscapeでの処理フローについて
下記URLのCytoscapeの開発者の記事によりますと以下のような処理フローになるようです。
http://qiita.com/keiono/items/0db6495c4d8bc215de67
ネットワークデータを読み込む アトリビュート(データテーブル)を読み込む ソーシャルネットワークの例で言えば 人の名前、年齢、職業、性別などがノードアトリビュート 婚姻関係、友人関係、メールの交換された数などといった関係性に関する情報がエッジアトリビュート 統合されたデータを使って、統計的な解析やフィルタリングなどを行い分析する 自動レイアウト機能を使って、ノードを見やすく配置する ノード、エッジに付随する各種データに基づきマッピングを設定し、描画を行う PDFやJavaScriptウィジェットに出力する
Cytoscape.jsを試してみる
では実際にCytoscape.jsを試しに動かしてみたいと思います。
ここのチュートリアルを試してます。
1.ソース取得
ソースををgithubからダウンロードしておきます。
https://github.com/cytoscape/cytoscape.js
2.html作成
以下のhtmlを記述します。srcのパスはダウンロードしたCytoscape.jsのdist/cytoscape.min.jsを参照するように修正しておいてください。
<!doctype html> <html> <head> <title>Tutorial 1: Getting Started</title> <script src="cytoscape.js"></script> </head> <style> #cy { width: 100%; height: 100%; position: absolute; top: 0px; left: 0px; } </style> <body> <div id="cy"></div> </body> </html>
divタグに対してcanvasを埋め込んでグラフの描画を行いまして、cssで表示領域を設定しています。
3.グラフインスタンスの作成
var cy = cytoscape({ container: document.getElementById('cy'), elements: [ { data: { id: 'a' } }, { data: { id: 'b' } }, { data: { id: 'ab', source: 'a', target: 'b' } }] });
上記スクリプトで描画するノードとエッジを指定しています。data: { id: ‘a’ } 、data: { id: ‘b’ } とありますので2つのノードが描画されます。それから、data: {id: ‘ab’,source: ‘a’,target: ‘b’}とありますので2つのノードを結んだエッジも表示されます。
先ほど作成したhtmlにスクリプトを記述して開いてみるとグラフが表示されたことが確認できます。cytoscape.jsには自動レイアウト機能がありますのでウィンドウのサイズを変更して開き直すとノードが縦に並んだり、横に並んだりして自動でレイアウトして描画されることが確認できます。
4.グラフの描画設定 3で描画したスクリプトのelementsに続けて以下のstyleを追記してください。
style: [ { selector: 'node', style: { shape: 'hexagon', 'background-color': 'red', label: 'data(id)' } }]
それから開き直すとノードが赤色六角形のID付きで表示されることが確認できたかと思います。
styleについて他にどんな設定があるかは以下を見たら良いと思います。
https://github.com/cytoscape/cytoscape.js/blob/master/documentation/md/style.md
5.ノードの配置設定
ノードをどのように配置するかはlayoutで指定できるようになっています。例えばノードを円状に並べて描画したい場合は、以下を追記したら行えます。
cy.layout({ name: 'circle' });
円状に並べたのをわかりやすくするためhtmlを編集しノードを増やしてみます。
<!doctype html> <html> <head> <title>Tutorial 1: Getting Started</title> <script src="../cytoscape.js/dist/cytoscape.min.js"></script> </head> <style> #cy { width: 100%; height: 100%; position: absolute; top: 0px; left: 0px; } </style> <body> <div id="cy"></div> <script> var cy = cytoscape({ container: document.getElementById('cy'), elements: [ { data: { id: 'a' } }, { data: { id: 'b' } }, { data: { id: 'ab', source: 'a', target: 'b' } }], style: [ { selector: 'node', style: { shape: 'hexagon', 'background-color': 'red', label: 'data(id)' } }] }); for (var i = 0; i < 10; i++) { cy.add({ data: { id: 'node' + i } } ); var source = 'node' + i; cy.add({ data: { id: 'edge' + i, source: source, target: (i % 2 == 0 ? 'a' : 'b') } }); } cy.layout({ name: 'circle' }); </script> </body> </html>
これで開きなおしてみると以下のように円状に並んで表示されることが確認できます。
レイアウトの描画設定は自分で行うこともできここにまとめられているのですがcircleに配置するのであれば、以下の設定でも同様となっております。
var options = { name: 'circle', fit: true, // whether to fit the viewport to the graph padding: 30, // the padding on fit boundingBox: undefined, // constrain layout bounds; { x1, y1, x2, y2 } or { x1, y1, w, h } avoidOverlap: true, // prevents node overlap, may overflow boundingBox and radius if not enough space radius: undefined, // the radius of the circle startAngle: 3 / 2 * Math.PI, // where nodes start in radians sweep: undefined, // how many radians should be between the first and last node (defaults to full circle) clockwise: true, // whether the layout should go clockwise (true) or counterclockwise/anticlockwise (false) sort: undefined, // a sorting function to order the nodes; e.g. function(a, b){ return a.data('weight') - b.data('weight') } animate: false, // whether to transition the node positions animationDuration: 500, // duration of animation in ms if enabled animationEasing: undefined, // easing of animation if enabled ready: undefined, // callback on layoutready stop: undefined // callback on layoutstop }; cy.layout( options );
6.ノードごとでスタイルを変えてみる
ノードごとでスタイルを変更したい場合があると思うのですが、その場合はノードに付与したIDを指定してstyle属性に設定を追加してあげれば行えます。
例えばID=node1に対してスタイルを設定したい場合は以下のようになります。
{ selector: '#node1', style: { 'background-image': 'https://farm8.staticflickr.com/7272/7633179468_3e19e45a0c_b.jpg', 'height': 80, 'width': 80, 'background-fit': 'cover', 'border-color': '#000', 'border-width': 3, 'border-opacity': 0.5 }
結果、このようになりスタイルが適用されているのが確認できます。
触ってみた感想として表示するデータに合わせて柔軟にレイアウトを調整するというよりかは、事前に表示するデータを想定した上でそれに合わせてレイアウトをどう調整するのかが重要そうな気がしました。他の分析ソフトからエッジとノードあとアトリビュートをエクスポートできたら分析結果の共有に便利そうです。
Rでネットワーク分析をしてみる
ちょっと興味が湧いたのでigraphを使ってネットワーク分析を行ってみたいと思います。
igraph
igraphとは?
ネットワーク解析周りの関数が多数登録されており、簡単にネットワーク解析を行うことができる。最近ではグラフデータベースのneo4jなどでネットワーク分析をするのが流行ってそうですが、簡単に試す場合はRとかapach sparkを使う方が楽そうな気がします。
インストール
install.packages(“igraph”)
バージョン確認
packageVersion(“igraph”)
パッケージの読みこみ
library(igraph)
サンプルデータで動かしてみる
以下のサイトから"Zachary’s karate club"をダウンロードする。データの中身は大学の空手部の交友関係情報らしいです。
http://www-personal.umich.edu/~mejn/netdata/
igraphを使う
install.packages(“igraph”)
library(“igraph”)
データを読み込む
“Zachary’s karate club"のデータをダウンロード後解答したkarate.gmlがグラフの情報を表すデータになります。試しに開いてみると以下のようにノードとエッジの情報を持った有向グラフを表していることがわかります。
Creator "Mark Newman on Fri Jul 21 12:39:27 2006" graph [ node [ id 1 ] node [ id 2 ] ~~~ edge [ source 34 target 32 ] edge [ source 34 target 33 ] ]
今回はノードにラベルの情報が含まれていないため、グラフ表示する際に名前の情報は含まれません。ノードにラベルを含む場合は以下のようにnodeの要素にlabelの項目を追加する形式で保存されます。
Creator "Mark Newman on Sat Jul 22 05:32:16 2006" graph [ directed 0 node [ id 0 label "BrighamYoung" value 7 ] node [ id 1 label "FloridaState" value 0 ] ~~~
ちなみにsourceとtargetが同一のエッジが複数あった場合強いつながりとして処理してくれてそうだった。
gml形式のデータについて詳しくは下記参照
http://graphml.graphdrawing.org/
今回はすでにノードとエッジの情報は分析済みのgml形式のデータでRStudioから表示させる動きを試してみたいと思います。 グラフデータの読み込みはread.graphを使います。
データの可視化
plot関数で読み込んだグラフを可視化します。
plot(karate_gh, vertex.size=4, edge.arrow.size=0.2, layout=layout.fruchterman.reingold)
コミュニティ分析
leading.eigenvector.community関数によりグラフのデータからコミュニティの情報を得ます。得たデータをplot関数に利用することでコミュニティごとで色分けした表示が行えます。
karate_com <- leading.eigenvector.community(karate_gh)
plot(karate_gh, vertex.size=4, edge.arrow.size=0.2, vertex.color=karate_com$membership, layout=layout.fruchterman.reingold)
中心性解析
page.rank関数によりどれだけコミュニティの中心に位置していかを数値化できるので、これをプロット時のノードのサイズとして表すことで中心性解析の可視化が行える。
karate_pr <- page.rank(karate_gh, directed=TRUE)
plot(karate_gh, vertex.size=karate_pr$vector*200, vertex.color=karate_com$membership, edge.arrow.size=0.2, layout=layout.fruchterman.reingold)
とりあえずノードとエッジがはっきりしたgml形式のデータがあればRで簡単に可視化できることがわかった。ブラウザ上でデータをみたい場合は、以下のossが使えそう。
js.cytoscape.org
Chainerによるロジスティック回帰
PythonのディープラーニングフレームワークであるChainerを使って簡単なセンチメント分析を行ってみたいと思います。
必要なモジュールのインポート
# import chainer module import numpy as np import chainer from chainer import cuda, Function, gradient_check, Variable from chainer import optimizers, serializers, utils from chainer import Link, Chain, ChainList import chainer.functions as F import chainer.links as L # import mecab module import re import MeCab import json
それからロジスティック回帰用のモデルを定義します。レイヤーは一つで入力が単語の種類数で、出力がクラスの数で今回はポジティブ、ネガティブの2つになるように引数で渡します。単語の種類数は学習データを読み無彩にカウントします。
# ロジスティック回帰を行います class MyRogistic(Chain): # パラメータ数に語彙の数, 分類結果数を受け取ります。 def __init__(self, vocab_count, class_count): # モデルを定義しています。中間層もない1層ネットワークです。 super(MyRogistic, self).__init__( l1=L.Linear(vocab_count,class_count), ) def __call__(self,x,y): return F.mean_squared_error(self.fwd(x), y) # 誤差関数はソフトマックスです def fwd(self,x): return F.softmax(self.l1(x))
次に入力データから単語の頻出数をカウントできるようにするためのメソッドを準備しておきます。判定に関係なさそうな単語は除外するようにします。
# 形態素解析を行うための関数を定義しておきます def _mecab_parse_feat(feat): return dict(zip(_mecab_feat_labels, feat.split(','))) def _mecab_node2seq(node, decode_surface=True, feat_dict=True, mecab_encoding='utf-8'): # MeCab.Nodeはattributeを変更できない。 while node: error_count = 0 try: if decode_surface: node._surface = node.surface if feat_dict: # 品詞の情報をdictで保存 node.feat_dict = _mecab_parse_feat( node.feature ) yield node except: error_count += 1 node = node.next #回帰の邪魔になるストップワードは除外するようにします。 def is_stopword(n): # <- mecab node if len(n._surface) == 0: return True elif re.search(u'^[\s!-@\[-`\{-~ 、-〜!-@[-`]+$', n._surface): return True elif re.search(u'^(接尾|非自立)', n.feat_dict['cat1']): return True elif u'サ変・スル' == n.feat_dict['conj'] or u'ある' == n.feat_dict['orig']: return True elif re.search(u'^(名詞|動詞|形容詞)', n.feat_dict['pos']): return False else: return True def not_stopword(n): # <- mecab node return not is_stopword(n) def node2word(n): # <- mecab node return n._surface def node2norm_word(n): # mecab node if n.feat_dict['orig'] != '*': return n.feat_dict['orig'] else: return n._surface def word_segmenter_ja(sent, node_filter=not_stopword, node2word=node2norm_word, mecab_encoding='utf-8'): if type(sent) == "unicode": sent = sent.encode(mecab_encoding) nodes = list( _mecab_node2seq(_mecab.parseToNode(sent), mecab_encoding=mecab_encoding) ) if node_filter: nodes = [n for n in nodes if node_filter(n)] words = [node2word(n) for n in nodes] return words
日本語の構文解析に使うmecabを初期化します。辞書にはneologdを使用しています。
# 形態素解析の辞書にneologdを使用します tagger = MeCab.Tagger(' -Ochasen -d /usr/local/lib/mecab/dic/mecab-ipadic-neologd') _mecab = MeCab.Tagger() # 品詞,品詞細分類1,品詞細分類2,品詞細分類3,活用形,活用型,原形,読み,発音 _mecab_feat_labels = 'pos cat1 cat2 cat3 conj conj_t orig read pron'.split(' ')
学習データを初期化します。本番ではDBに保存されているデータをここで読み込むようにします。
# 学習用データ準備 text1 = """今までにない臨場感ですごい!効果ですけど値段に見合うだけの価値があります。""" text2 = """ 昔見た近未来に一つ近づいたと思います。""" text3 = """ 評価は高いようですが簡単に手を出せる価格ではありません。これを買うくらいなら別のVR製品を買ってもおまけが来ます。 正直信者が騒いでいるだけに思います。 """ text4 = """値段は下がるはずなので今は待ちだと思います。遊ぶにもソフトが少ないですし。 """ learn_docs = [ word_segmenter_ja(text1), word_segmenter_ja(text2), word_segmenter_ja(text3), word_segmenter_ja(text4) ] # 学習データをポジティブ0, ネガティブ1の何方かに分けるようにします ans_index = [0,0,1,1] # 語彙の数を求めるのに使う def word_vocabularies(data): vocabularies = {} word_id = 0 for doc in data: for word in doc: if word not in vocabularies: vocabularies[word] = word_id word_id += 1 return vocabularies # データ数 data_num = len(learn_docs) # 分類するクラス数 今回はポジティブ、ネガティブの2値分類です class_num = 2 word_vocab = word_vocabularies(learn_docs) vocab_count = len(word_vocab) learn_data = np.zeros(vocab_count * data_num).reshape(data_num, vocab_count).astype(np.float32) # 学習時に使う入力データを初期化します for i in range(len(learn_docs)): for word in learn_docs[i]: learn_data[i, word_vocab[word]] += 1.0 # 学習時に使う答えのデータを初期化します learn_ans = np.zeros(class_num * data_num).reshape(data_num,class_num).astype(np.float32) for i in range(data_num): learn_ans[i,np.int(ans_index[i])] = 1.0
ロジスティック回帰のモデルを初期化し、学習を行います。
# モデルを初期化します。 model = MyRogistic(vocab_count, class_num) # パラメータの最適化アルゴリズムにAdamを使います optimizer = optimizers.Adam() optimizer.setup(model) #学習します # 学習データをランダムにサンプリングするために使います。 n = 4 # バッチサイズです。今回は学習データが4つしかないのでほとんど意味ないです bs = 25 for j in range(5000): # インデックスをランダムにします sffindx = np.random.permutation(n) accum_loss = None for i in range(0, n, bs): # ミニバッチを取得 x = Variable(learn_data[sffindx[i:(i+bs) if (i+bs) < n else n]]) y = Variable(learn_ans[sffindx[i:(i+bs) if (i+bs) < n else n]]) # 勾配を初期化 model.zerograds() # 順方向に計算して誤差を取得 loss = model(x,y) # 逆伝搬を行います loss.backward() # パラメータを更新します。今回はAdamを使っています。 optimizer.update() # accum_loss = loss if accum_loss is None else accum_loss + loss バッチサイズで誤差を累計 # accum_loss.backward() 逆伝搬 # optimizer.update() パラメータ更新
学習によりモデルが最適化されたので、それを使って計算を行い正しい結果が得られるか確認します。(今回は学習データが4つなのであんまり意味ないと思いますが)
# パラメータ更新後に回帰を行ってみる xt = Variable(learn_data, volatile='on') # 準伝搬での計算結果を取得 yy = model.fwd(xt) # 答え合わせ用のデータを初期化 ans = yy.data nrow, ncol = ans.shape ok = 0 ans_word=['posi', 'nega'] for i in range(nrow): # 分類結果の取得 cls = np.argmax(ans[i,:]) print(ans_word[cls]) if learn_ans[i][cls] == 1.0: ok += 1 #正解数の表示 print (ok, "/", nrow, " = ", (ok * 1.0)/nrow)
学習をしたあとはモデル、辞書を保存します。
# 学習データの保存 # モデルの保存 # Save the model and the optimizer print('save the model') serializers.save_npz('sentiment.model', model) print('save the optimizer') serializers.save_npz('sentiment.state', optimizer) # 辞書の保存 with open( 'vocab.dump', "w") as vocab_f: json.dump(str(word_vocab), vocab_f)
保存した辞書を読み込んでみます。
# 学習データの読み込み load_vocab = {} with open( 'vocab.dump', "r") as vocab_f: for data in json.load(vocab_f)[1:-1].split(","): pare_data = data.replace("'", "").split(":") load_vocab[pare_data[0].strip()] = pare_data[1].strip() vocab_count = len(load_vocab) class_count = 2 # load the model and the optimizer load_model = MyRogistic(vocab_count, class_count) load_optimizer = optimizers.Adam() load_optimizer.setup(load_model) print('load the model') serializers.load_npz('sentiment.model', load_model) print('load the optimizer') serializers.load_npz('sentiment.state', load_optimizer)
きちんと読み込まれているか試しに判定してみます。
# テストデータ test_text1 = """今までにない臨場感ですごい!効果ですけど値段に見合うだけの価値があります。""" test_text2=""" 評価は高いようですが簡単に手を出せる価格ではありません。これを買うくらいなら別のVR製品を買ってもおまけが来ます。 正直信者が騒いでいるだけに思います。 """ test_doc = [ word_segmenter_ja(test_text1), word_segmenter_ja(test_text2) ] test_data = np.zeros(vocab_count *2 ).reshape(2, vocab_count).astype(np.float32) for i in range(len(test_doc)): for word in test_doc[i]: if word in load_vocab: test_data[i,np.int(load_vocab[word])] += 1.0 xt = Variable(test_data, volatile='on') yy = load_model.fwd(xt) ans = yy.data nrow, ncol = ans.shape ans_word=['posi', 'nega'] for i in range(nrow): cls = np.argmax(ans[i,:]) print(ans_word[cls])
chainerなら基本的なことは簡単に試せそうです。
ApacheSparkの基本的なデータ操作
データ操作
Spark2.1で動作確認
build.sbt
IDEAから実行できるようにbuild.sbtに以下を追加。対象のライブラリが存在しない場合はmaven_centralで確認する。
val sparkVersion = "2.1.0" libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % sparkVersion % "compile", "org.apache.spark" % "spark-sql_2.11" % sparkVersion % "compile" )
RDD
実行時の設定の読み込み
val conf = new SparkConf().setAppName("WordCountTop3").setMaster("local[*]") val sc = new SparkContext(conf)
ハイフン区切りで単語出現数のカウント
val filePath = "build.sbt" val wordAndCountRDD = sc.textFile(filePath) .flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) wordAndCountRDD.collect.foreach(println)
単語出現数をソートして上位3件を表示
val filePath = "build.sbt" val wordAndCountRDD = sc.textFile(filePath) .flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) wordAndCountRDD.sortBy( _._2.toInt, ascending = false).take(3).foreach(println)
DataFrame
設定の読み込み
val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]") val sc = new SparkContext(conf) // spark2からはSparkSessioのほうが推奨だけどエラーで使えなかった val sqlContext = new org.apache.spark.sql.SQLContext(sc)
Spark2.1ではSQLContextではなくSparkSessionの方を推奨しているけど、エラーが発生して利用できなかった。とりあえずSQLContextで試してみる。
RDDでファイルを読み込む
val dessertRDD = sc.textFile("data/dessert-menu.csv") .map{ record => val splitRecord = record.split(",") val menuId = splitRecord(0) val name = splitRecord(1) val price = splitRecord(2).toInt val kcal = splitRecord(3).toInt Dessert(menuId, name, price, kcal) }
RDDからDataFrameへ変換
// RDD → DataFrame val dessertDF:DataFrame = sqlContext.createDataFrame(dessertRDD)
スキーマ定義の表示
println("show schema") println(dessertDF.printSchema)
DataFrameに対してクエリを投げる。生成ずみのDataFrameに対して直接クエリを投げるのではなくワークテーブルを作成して、そちらに対してクエリを実行する。ATAN2などの組み込み関数については公式のドキュメントを確認する。
dessertDF.createOrReplaceTempView("dessert") val numOver300KcalDF = sqlContext.sql( "select count(*) as num_of_over_300Kcal FROM dessert where kcal >= 260" ) println(numOver300KcalDF.show) println(sqlContext.sql("select atan2(1, 3) as `ATAN2(1, 3)`").show)
DataFrameをjoinしてクエリを投げる。
val dessertOrderRDD = sc.textFile("data/dessert-order.csv") .map{record => val splitRecord = record.split(",") val sId = splitRecord(0) val menuId = splitRecord(1) val num = splitRecord(2).toInt DessertOrder(sId, menuId, num) } val dessertOrderDF:DataFrame = sqlContext.createDataFrame(dessertOrderRDD) dessertOrderDF.createOrReplaceTempView("desert_order") println(sqlContext.sql( """ |select do.sId | , d.name | , do.num * d.price as amout_per_menu_per_slip |from desert d |join desert_order do on d.menuId = do.menuId """.stripMargin).show
Apache Sparkを触ってみた
Apache Sparkとは?
Hadoopと同じく分散処理のフレームワーク。HadoopではMapReduceと言って複数マシンで分散処理を行ってから結果をストレージに書き出す。1回の処理では終わらない場合はデータの処理フローを形成することになり、よみ出し→分散処理→書き込みを繰り返す動きをする。MapReduceでは処理の中間結果が常にストレージに書き出されるためデータ量が大きくなっても動作し、障害からの回復も容易であると言ったメリットがある。
しかしこれではあるデータの部分集合に対し複数回で処理する場合、都度すべてのデータをストレージに書き込む処理が行われるため必要な計算コストが大きくなってしまう。
Sparkでは連続する処理の中で無駄なディスクやネットワークのI/Oを起こさないように処理することでMapReduceの問題に対処している。Sparkの高速化が期待できるのは、複数回の反復処理や連続する変換処理になります。
Sparkのデータ構造
ApacheSparkのデータ処理には「RDD」と呼ばれるデータ構造を利用する。RDDは複数のマシンから構成されるクラスタ上での分散処理を前提として設計されており、内部的にはパーティションという塊に分割される。RDDをパーティションごとに複数のマシンで処理することで分散処理が行われる。
Sparkの分散処理環境
RDDのデータ構造を分散処理するためのクラスタ管理システムには以下のようなものがある。
* YARN
Hadoopのクラスタ管理システム。Hadoopの分散ファイルシステムであるHDFSで利用するとI/Oが効率化されるらしい。
* Mesos
* Spark Standalone
Sparkに同梱されているクラスタ管理システム。別途クラスタ管理システムを用意する必要がなく手軽に利用できる。
RDDによるタスク処理
RDDには以下の情報が含まれている
* RDDの元になるデータの情報、または変換前のRDD
* RDD成の元になるデータのロード方法、変換方法
* 変換後のRDDの要素の型
* 変換後のRDDのパーティション数
* RDDが永続化されているかどうか
RDD内に含まれる上記の情報と遅延評価の性質により効率的なタスクのスケジューリングが可能となっているらしいけど詳しくは別でドキュメントを見たほうがよさそう。
http://kimutansk.hatenablog.com/entry/20130902/1378142510
https://www2.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf
Sparkのマシン構成
ホスト名 | 役割 | 説明 |
---|---|---|
spark-client | クライアント | アプリケーションを起動するクライアント |
spark-master | マスターノード | クラスタ内のリソースを管理 |
spark-worker00 | ワーカーノード | 働く |
Sparkインストール
とりあえずmacにインストールしてみる
以下のページからsparkをダウンロードして解凍後binにパスを通すととりあえず動きを試すことができる
http://spark.apache.org/downloads.html
パスを通した後は以下のコマンドが実行できたらApacheSparkはインストールされている
spark-shell –version
IDEAでの開発
- ideaでSBTプロジェクトを作成
使用するscalaのバージョンは公式ページでapiをサポートしているバージョンを事前に確認しておく(2017/3/20時点ではscala2.12からは利用できなかった)
http://spark.apache.org/docs/latest/ - 実行可能jarが生成できるようにsbt-assemblyをプラグインに追加。追加方法は以下のgithubのページを参考にする。
https://github.com/sbt/sbt-assembly
plugins.sbtに以下を追加する
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")
org.jboss.interceptorがunresolvedになったらplugins.sbtにさらに以下を追記
resolvers += "JBoss" at "https://repository.jboss.org"
- build.sbtのlibraryDependenciesにspark-coreを追加する。
バージョンには気をつける
libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "2.1.0" )
- abt-assemblyのオプションをbuild.sbtに追加
詳しくは以下のペジを参照。ファイル読み込み時に複数ファイルが見つかった時どうするかのために必要
https://github.com/sbt/sbt-assemblyassemblyMergeStrategy in assembly := { case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first case "application.conf" => MergeStrategy.concat case "unwanted.txt" => MergeStrategy.discard case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) }
ついでに出力するjarファイル名も設定しておくassemblyJarName in assembly := "something.jar"
- 試しにjar出力
ApacheSparkで実行可能なjarを試しに出力しておく > sbt assembly で多分失敗するので、その場合はlibraryDependenciesをprovidedにしておく。"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "provided"
それから再度sbt assemlbyを実行して所定のパスにjarが生成されることを確認。ただprovidedの場合有効なスコープがコンパイル時とテスト時になる。今回はIDEAから直接実行して簡単に動きを見るだけなので一旦以下のように変更しておく。"org.apache.spark" % "spark-core_2.11" % "2.1.0" % "compile"
簡単なスクリプトをIDEAから動かしてみる
それではSparkを実行するsbtプロジェクトが準備できたので試しにサンプルコードを動かしてみます。 以下のページにある単語の出現数をカウントするscalaのプログラムを動かしてみたいと思います。 http://spark.apache.org/examples.html
import org.apache.spark.{SparkConf, SparkContext} object spark_shell { def main(args:Array[String]): Unit ={ val conf = new SparkConf().setAppName("WordCount") val sc = new SparkContext(conf) try{ val filePath = "build.sbt" val wordAndCountRDD = sc.textFile(filePath) .flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) wordAndCountRDD.collect.foreach(println) } sc.stop() } }
まあ、単純に読み込んだファイルをスペース区切りで分割してwordごとのmapでカウントアップするだけです。
これで実行してみると以下のエラーが発生すると思います。
- SparkException: A master URL must be set in your configuration
このエラーについてですが、sparkのタスクを実行する場合spark-clientからspark-masterを指定してプログラム実行の流れになるのですが、IDEAでそのまま実行するだけではmaster未指定のため上記エラーが発生すると思われます。本番環境のクラスタリング構成で動かす場合はpark-submitのオプションで指定するのかと思います。 http://spark.apache.org/docs/latest/submitting-applications.html
今回は手軽に動きを試したいのでプログラム中のSparkConfを以下のように変更しmasterとして自分自身を指定して動きを見たいと思います。
val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
それから実行してみると環境情報等読み込んだ後に以下のような出力結果が得られると思います。
(oldStrategy,1) (x,1) (endsWith,1) ("2.11.8",1) (assembly).value,1) (scalaVersion,1) (,97) (@,2) ("unwanted.txt",1) (libraryDependencies,1) ("spark-core_2.11",1) ("2.9.7",1) ("joda-time",2) (PathList(ps,1) (_*),2) (assembly,3)
Scalaで数式をパースして整形表示
数式パーサ
Scalaのコップ本を一通り読んだので振り返りも兼ねて数式をパースして計算、整形するDSLを実装したいと思います。全部自力で作れたら良いのですが、ソースコードはほとんど(http://blog.j5ik2o.me/entry/20101126/1290763057) とコップ本から拝借しています。
パースした数式は探索して評価を行うので、式を表すExpressionトレイトと訪問するExpressionVisitorトレイトを用意します。
// 式を訪問するビジター trait ExpressionVisitor { def visit(e:Expression):BigDecimal } // 式を表すトレイト trait Expression { def accept(visitor:ExpressionVisitor):BigDecimal = { visitor.visit(this) } }
それから各数式を表すのに使うケースクラスを定義しておきます。
// 各式を表すケースクラス。これを使ってAST(抽象構文木)を作ります。 case class Value(digit:BigDecimal) extends Expression case class Add(expr1:Expression, expr2:Expression) extends Expression case class Sub(expr1:Expression, expr2:Expression) extends Expression case class Plus(expr:Expression) extends Expression case class Minus(expr:Expression) extends Expression // カッコで囲んでいる部分 case class Parenthesized(expr:Expression) extends Expression case class Multiply(expr1:Expression, expr2:Expression) extends Expression case class Divide(expr1:Expression, expr2:Expression) extends Expression // 数式と評価結果を表す case class ExecResult(exp:Expression, result: Value) extends Expression
RegexParsersを使って数式をパースします。構文解析にはASTというものを使います。数式のクラスは入れ子構造になるのですが、ASTで解析する時はまず+,-次に*,/それからカッコというように優先度の低いものを先に評価することで正しくパースが行えるようです。この辺りは既にあるものを見るならなんとなくわかるけど、自分で一から考えるのは難しそう
// 式のパーサ class ExprParser extends RegexParsers { def parse(data:String) = parseAll(expression, data) // expression ::= term { "+" term | "-" term } def expression : Parser[Expression] = term~rep("+"~term | "-"~term) ^^ { case opr1~lists => { var operand1 = opr1 lists.foreach { l => l match { case "+"~f => { operand1 = Add(operand1, f) } case "-"~f => { operand1 = Sub(operand1, f) } } } operand1 } } // term ::= factor { "*" factor | "/" factor } def term : Parser[Expression] = factor~rep("*"~factor | "/"~factor) ^^ { case opr1~lists => { var operand1 = opr1 lists.foreach { l => l match { case "*"~f => { operand1 = Multiply(operand1, f) } case "/"~f => { operand1 = Divide(operand1, f) } } } operand1 } } // factor ::= unary def factor : Parser[Expression] = unary // unary :: = "+" unary | "-" unary | primary def unary : Parser[Expression] = ("+"|"-")~unary ^^ { case "+"~u => Plus(u) case "-"~u => Minus(u) }| primary // primary ::= "(" expression ")" | value def primary : Parser[Expression] = "("~expression~")" ^^ { case lbrace~expr~rbrace => Parenthesized(expr) }|value // value ::= "[0-9]+" // floatingPointNumberは浮動小数点を表す正規表現のParser def value : Parser[Expression] = fpn ^^ { n => Value(BigDecimal(n, MathContext.DECIMAL32)) } def fpn: Parser[String] = """-?(\d+(\.\d*)?|\d*\.\d+)([eE][+-]?\d+)?[fFdD]?""".r }
それから、パース後に数式の答えを出すのは以下のvisitorパターンを使います。
class Evaluator extends ExpressionVisitor { // ケースクラスの抽出(unapply)を使って内包されている式を取り出してさらに訪問する override def visit(e:Expression) : BigDecimal = e match { case Value(digit) => digit case Add(l,r) => l.accept(this) + r.accept(this) case Sub(l,r) => l.accept(this) - r.accept(this) case Multiply(l,r) => l.accept(this) * r.accept(this) case Divide(l,r) => l.accept(this) / r.accept(this) case Minus(e) => e.accept(this) * -1 case Plus(e) => e.accept(this) case Parenthesized(e) => e.accept(this) } }
整形表示なしで式を計算するだけなら、以下のようになります。
val parser = new ExprParser val parseResult = parser.parse("3/2 + 5 * (24 + 7) /2") if ( parseResult.successful ){ parseResult.get.accept(new Evaluator) }else{ "" }
次に、数式の整形表示についてですが、表示用の情報はElementオブジェクトを拡張したクラスを使用して使います。
object Element{ private class ArrayElement(val contents: Array[String]) extends Element private class LineElement(s: String) extends Element{ val contents = Array(s) override def width = s.length override def height = 1 } private class UniformElement( ch:Char, override val width: Int, override val height:Int )extends Element{ private val line = ch.toString * width def contents = Array.fill(height)(line) } def elem(contents:Array[String]):Element = new ArrayElement(contents) def elem(chr: Char, width: Int, height: Int ): Element = new UniformElement(chr, width, height) def elem(line: String): Element = new LineElement(line) }
表示用のクラスには数式の一部が格納されていて、その一部の数式同士の位置を調整するためのメソッド群をElementのabstractクラスに持たせておきます。分子を分母の上に表示するのにはここのクラスのメソッドを使用します。
abstract class Element { import util.Element.elem def contents: Array[String] def width: Int = contents(0).length def height: Int = contents.length def above(that: Element): Element = { val this1 = this widen that.width val that1 = that widen this.width assert(this1.width == that1.width) elem(this1.contents ++ that1.contents) } def beside(that: Element): Element = { val this1 = this heighten that.height val that1 = that heighten this.height elem( for((line1, line2) <- this1.contents zip that1.contents) yield line1 + line2) } def widen(w:Int):Element = if(w<=width) this else{ val left = elem(' ', (w-width)/2, height) val right = elem(' ', w - width - left.width, height) left beside this beside right } def heighten(h: Int): Element = if(h <= height)this else{ val top = elem(' ', width, (h - height)/2) val bot = elem(' ', width, h - height - top.height) top above this above bot } override def toString = contents mkString "\n" }
それからパースされた数式のクラスを表示用クラスに変換するExprFormatterクラスを作成します。Scalaのパターンマッチを使って入れ子構造のクラスを再帰的に評価しています。
class ExprFormatter { private val fractionPrecedence = -1 private def format(e: Expression, enclPrec: Int): Element = e match { case Value(num) => elem(" " + num.toString + " ") case Add(x, y) => val l = format(x, fractionPrecedence) val op = elem("+") val r = format(y, fractionPrecedence) l beside op beside r case Sub(exp1, exp2) => val l = format(exp1, fractionPrecedence) val op = elem("-") val r = format(exp2, fractionPrecedence) l beside op beside r case Plus(exp) => val op = elem("+") val ex = format(exp, fractionPrecedence) op beside ex case Minus(exp) => val op = elem("-") val ex = format(exp, fractionPrecedence) op beside ex case Parenthesized(exp) => val l = elem("(") val ex = format(exp) val r = elem(")") l beside ex beside r case Multiply(exp1, exp2) => val l = format(exp1, fractionPrecedence) val op = elem("x") val r = format(exp2, fractionPrecedence) l beside op beside r case Divide(exp1, exp2) => val top = format(exp1, fractionPrecedence) val bot = format(exp2, fractionPrecedence) val line = elem('-', top.width max bot.width, 1) val frac = top above line above bot if (enclPrec != fractionPrecedence) frac else frac case ExecResult(exp, result) => val e = format(exp, fractionPrecedence) val eq = elem("=") val r = format(result, fractionPrecedence) e beside eq beside r } def format(e: Expression): Element = format(e, 0) }
最後に数式のパース,整形表示は以下のように利用します。
trait MathParser { def mathParse(input: String) = { val f: ExprFormatter = new ExprFormatter val parser = new ExprParser val parseResult = parser.parse(input) if ( parseResult.successful ){ val evalResult = parseResult.get.accept(new Evaluator) f.format(ExecResult(parseResult.get, Value(evalResult))) }else{ "" } } }
これで以下のように入力を行うと
3/2 + 5 * (24 + 7) /2
こんな風に整形して出力されることが確認できます。
3 5 x( 24 + 7 ) ---+--------------= 79.0 2 2
他の言語でDSLを実装したことがないので比較はできませんが、Scalaを使うことでASTで表した構造をそのままコードに落とし込むイメージがつきやすい気がしたのでパーサの構築の敷居が大分下がっているように思いました。
Postgreyについて
Postgrey
Postgreyとはpostfixを使用する時に初めて来る相手からは一定の待ち時間後に再送が来なければ受信を受け付けないといったもので、phpとかで大量のスパムメールを送りつけてきたのを受診させないためのものになっています。 インストールから使用するまでを簡単にまとめると以下のようになります。
インストール〜使用開始まで(CentOS7系で確認)
# yum install epel-release
# vi /etc/yum.repos.d/epel.repo
enabled=0
# yum –enablerepo=epel install postgrey
# vi /etc/sysconfig/postgrey
POSTGREY_OPTS="--inet=10023 --delay=1800" #メインサーバの設定にあわせる
○postgrey起動
# systemctl start postgrey
# systemctl enable postgrey
グレイリスティングの対象について
上記設定を行うと初めてくる相手からは以下のようにログが出力されるようになっています。
Sep 11 07:52:43 xxx postgrey[31898]: action=greylist, reason=new, client_name=xxx, client_address=xxx, sender=xxx, recipient=xxx
“action=greylist, reason=new"となっていますが、これは初めてメールを送ってきた相手のため一旦受診を拒否したといった内容になっております。ここでいう初めて送ってきた相手についてですが、最近まではメールサーバ間でのやり取りがあったかどうかだと思っていたのですが実際は違っていたようでして、それが確認できるログは以下のようになっています。
Sep 11 08:01:05 xxx postgrey[31898]: action=pass, reason=triplet found, delay=508, client_name=xxx, client_address=xxx, sender=xxx, recipient=xxx
これはPostgreyのデータベース内にtriplet(送信元サーバIP, 送信者アドレス、 受信者アドレスの組み合わせ)が存在したため、グレイリスティングにパスしたということを表しています。自分はこのログを見て気づいたのですがどうやら送信アドレス、受信者アドレス、送信サーバ毎で待ち時間が発生するようです。