自作のパーサーでログファイルをパースしてEmbulk経由でElasticsearchに取り込んでみる
例えば以下のようなAAA ~ CCCに対してaddやremove、setの操作を行う時系列のログデータをもとに、現在の値がどうなっているのかを手動で見ていくのは大変かと思います。
2020-05-24 11:25:18,784 INFO jp.co.teruuu.LoggingService [main] add BAA:99.674 2020-05-24 11:25:18,784 INFO jp.co.teruuu.LoggingService [main] set BBB:5.730 2020-05-24 11:25:18,785 INFO jp.co.teruuu.LoggingService [main] hello hello CCC 2020-05-24 11:25:18,785 INFO jp.co.teruuu.LoggingService [main] move from ABCtoCAC : 68.080 2020-05-24 11:25:18,785 INFO jp.co.teruuu.LoggingService [main] add ACC:20.390 2020-05-24 11:25:18,786 INFO jp.co.teruuu.LoggingService [main] hello hello ABA! 2020-05-24 11:25:18,786 INFO jp.co.teruuu.LoggingService [main] add AAA:54.581 2020-05-24 11:25:18,786 INFO jp.co.teruuu.LoggingService [main] globalRemove 46.694 2020-05-24 11:25:18,786 INFO jp.co.teruuu.LoggingService [main] hello world AAC 2020-05-24 11:25:18,786 INFO jp.co.teruuu.LoggingService [main] hello hello BCB! 2020-05-24 11:25:18,786 INFO jp.co.teruuu.LoggingService [main] hello hello ABB! 2020-05-24 11:25:18,787 INFO jp.co.teruuu.LoggingService [main] set AAC:59.363 2020-05-24 11:25:18,787 INFO jp.co.teruuu.LoggingService [main] globalAdd 63.874 2020-05-24 11:25:18,787 INFO jp.co.teruuu.LoggingService [main] set CCC:85.101 2020-05-24 11:25:18,787 INFO jp.co.teruuu.LoggingService [main] hello hello ABC! 2020-05-24 11:25:18,787 INFO jp.co.teruuu.LoggingService [main] hello hello CBC! 2020-05-24 11:25:18,787 INFO jp.co.teruuu.LoggingService [main] hello world ACC! 2020-05-24 11:25:18,788 INFO jp.co.teruuu.LoggingService [main] globalAdd 55.108 2020-05-24 11:25:18,788 INFO jp.co.teruuu.LoggingService [main] move from BBBtoBAC : 46.040 2020-05-24 11:25:18,788 INFO jp.co.teruuu.LoggingService [main] move from CAAtoCCC : 90.571 2020-05-24 11:25:18,788 INFO jp.co.teruuu.LoggingService [main] add BCA:36.741
これを解決するためにログファイルをパースしてElasticsearchに取り込んでみるのを試してみたいと思います。 ちなみにですが今回の検証に使うログは自前で生成したものを使っています。 string_generator/LoggingService.java at master · teruuuuuu/string_generator · GitHub
今回の目的としては対象を絞って時系列のデータを追跡しやすくしたら良いのでパースした後ドキュメントモデルのDBなどに取り込めれば良いと思い、またリアルタイムで集計する必要もなくバッチで処理したかったのでEmbulkを使うことにしました。またログのフォーマットは特に決まっておらず、操作事で自由に出力しているものとします。
Embulkのプラグインには以下のものがありまして、FILE PARSERを見てみますが自由な出力のログに対してうまく取り込める物はさすがになさそうです。 plugins.embulk.org
なので自前でパーサーを作成し、ログ -> jsonの変換を行い、それからEmbulk経由でElasticsearchにデータを取り込みたいと思います。
とりあえず以下のようにTimestamp, LogLevel, name, スレッド, ログ出力内容にパースしておきます。
case class Log4jLine(date: LocalDateTime, logLevel: LogLevel, name: String, thread: String, content: String) extends LogLine
logging_converter/Log4jParser.scala at 3ec8071cdbc04207d3effc56e81e08734323ea07 · teruuuuuu/logging_converter · GitHub
結果は以下のようになりました。操作の内容で分けておきたかったのですが、今回はcontentにログの出力をまとめていてとりあえずjsonで出力できました。
{"content":"move from CAB to CAA : 17.383","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000202"} {"content":"hello hello BAA","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000202"} {"content":"globalAdd 8.308","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000202"} {"content":"hello hello BAA!","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000203"} {"content":"globalRemove 61.999","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000203"} {"content":"move from BCB to BCC : 96.230","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000203"} {"content":"hello hello CAA!","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000203"} {"content":"remove BBA:41.788","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000203"} {"content":"globalAdd 89.201","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000204"} {"content":"remove CCA:90.795","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000204"} {"content":"globalAdd 45.250","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000204"} {"content":"globalAdd 23.735","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000204"} {"content":"move from ACB to CBA : 77.454","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000204"} {"content":"hello world ACB","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000204"} {"content":"set CCC:25.078","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000204"} {"content":"move from CAC to CBC : 83.523","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000204"} {"content":"move from BCC to CCA : 86.934","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000205"}
次にEmbulkでjsonのデータをelasticserchに取り込むのですが、それぞれの環境構築は以下を参考にしました。
□Embulk
github.com □Elasticsearch
Embulkに必要なプラグインは以下のコマンドでインストールしました。
embulk gem install embulk-parser-jsonl embulk gem install embulk-output-elasticsearch
Embulkの設定ファイルとして以下のseed.ymlを作成しpath_prefixで作成したjsonを参照できるようにしておきます。
in: type: file path_prefix: 'C:\mywork\embulk\.\json\json\result_' parser: type: jsonl root: $.logs schema: - {name: timestamp, type: string} - {name: level, type: string} - {name: package, type: string} - {name: thread, type: string} - {name: content, type: string} out: type: elasticsearch nodes: - {host: localhost, port: 9200} index: log index_type: log
それから以下のコマンドで実行するとElasticserchにデータが取り込まれます。embulk guess では先ほどのseed.ymlを指定します。
embulk gem install embulk-output-elasticsearch embulk guess ./json/seed.yml -o config.yml embulk preview config.yml embulk run config.yml
最後にElasticserchのデータを検索してみて取り込まれていることが確認できます。今回はパース結果のjsonのフォーマットが操作の内容毎で扱いやすくないので不便ですが、扱いやすいようにパーサーを修正したら時系列データの追跡が大分楽になりそうでした。
curl -XGET -H 'Content-Type: application/json' "localhost:9200/_search?pretty" -d '{ "_source": ["timestamp", "content"], "query": { "match_all": {} }, "sort": [{"timestamp": {"order": "desc"}}], "from": 5, "size": 5 }' { "took" : 16, "timed_out" : false, "_shards" : { "total" : 2, "successful" : 1, "skipped" : 0, "failed" : 1, "failures" : [ { "shard" : 0, "index" : "customer", "node" : "TtHQVusaSJWtY7apyFisKA", "reason" : { "type" : "query_shard_exception", "reason" : "No mapping found for [timestamp] in order to sort on", "index_uuid" : "51-lMr9pTWOXp98lwobGUQ", "index" : "customer" } } ] }, "hits" : { "total" : { "value" : 6224, "relation" : "eq" }, "max_score" : null, "hits" : [ { "_index" : "log", "_type" : "log", "_id" : "zvhLRXIBA0leiksQaxqN", "_score" : null, "_source" : { "content" : "add CAB:38.532", "timestamp" : "2020-05-24T11:30:37.000001" }, "sort" : [ 1590319837000 ] }, { "_index" : "log", "_type" : "log", "_id" : "z_hLRXIBA0leiksQaxqN", "_score" : null, "_source" : { "content" : "hello hello BBB", "timestamp" : "2020-05-24T11:30:37.000001" }, "sort" : [ 1590319837000 ] }, { "_index" : "log", "_type" : "log", "_id" : "0PhLRXIBA0leiksQaxqN", "_score" : null, "_source" : { "content" : "move from BAC to AAA : 30.179", "timestamp" : "2020-05-24T11:30:37.000002" }, "sort" : [ 1590319837000 ] }, { "_index" : "log", "_type" : "log", "_id" : "0fhLRXIBA0leiksQaxqN", "_score" : null, "_source" : { "content" : "move from AAC to CBA : 71.724", "timestamp" : "2020-05-24T11:30:37.000002" }, "sort" : [ 1590319837000 ] }, { "_index" : "log", "_type" : "log", "_id" : "0vhLRXIBA0leiksQaxqN", "_score" : null, "_source" : { "content" : "globalRemove 13.798", "timestamp" : "2020-05-24T11:30:37.000002" }, "sort" : [ 1590319837000 ] } ] } }