自作のパーサーでログファイルをパースしてEmbulk経由でElasticsearchに取り込んでみる

例えば以下のようなAAA ~ CCCに対してaddやremove、setの操作を行う時系列のログデータをもとに、現在の値がどうなっているのかを手動で見ていくのは大変かと思います。

2020-05-24 11:25:18,784 INFO jp.co.teruuu.LoggingService [main] add BAA:99.674
2020-05-24 11:25:18,784 INFO jp.co.teruuu.LoggingService [main] set BBB:5.730
2020-05-24 11:25:18,785 INFO jp.co.teruuu.LoggingService [main] hello hello CCC
2020-05-24 11:25:18,785 INFO jp.co.teruuu.LoggingService [main] move from ABCtoCAC : 68.080
2020-05-24 11:25:18,785 INFO jp.co.teruuu.LoggingService [main] add ACC:20.390
2020-05-24 11:25:18,786 INFO jp.co.teruuu.LoggingService [main] hello hello ABA!
2020-05-24 11:25:18,786 INFO jp.co.teruuu.LoggingService [main] add AAA:54.581
2020-05-24 11:25:18,786 INFO jp.co.teruuu.LoggingService [main] globalRemove 46.694
2020-05-24 11:25:18,786 INFO jp.co.teruuu.LoggingService [main] hello world AAC
2020-05-24 11:25:18,786 INFO jp.co.teruuu.LoggingService [main] hello hello BCB!
2020-05-24 11:25:18,786 INFO jp.co.teruuu.LoggingService [main] hello hello ABB!
2020-05-24 11:25:18,787 INFO jp.co.teruuu.LoggingService [main] set AAC:59.363
2020-05-24 11:25:18,787 INFO jp.co.teruuu.LoggingService [main] globalAdd 63.874
2020-05-24 11:25:18,787 INFO jp.co.teruuu.LoggingService [main] set CCC:85.101
2020-05-24 11:25:18,787 INFO jp.co.teruuu.LoggingService [main] hello hello ABC!
2020-05-24 11:25:18,787 INFO jp.co.teruuu.LoggingService [main] hello hello CBC!
2020-05-24 11:25:18,787 INFO jp.co.teruuu.LoggingService [main] hello world ACC!
2020-05-24 11:25:18,788 INFO jp.co.teruuu.LoggingService [main] globalAdd 55.108
2020-05-24 11:25:18,788 INFO jp.co.teruuu.LoggingService [main] move from BBBtoBAC : 46.040
2020-05-24 11:25:18,788 INFO jp.co.teruuu.LoggingService [main] move from CAAtoCCC : 90.571
2020-05-24 11:25:18,788 INFO jp.co.teruuu.LoggingService [main] add BCA:36.741

これを解決するためにログファイルをパースしてElasticsearchに取り込んでみるのを試してみたいと思います。 ちなみにですが今回の検証に使うログは自前で生成したものを使っています。 string_generator/LoggingService.java at master · teruuuuuu/string_generator · GitHub

今回の目的としては対象を絞って時系列のデータを追跡しやすくしたら良いのでパースした後ドキュメントモデルのDBなどに取り込めれば良いと思い、またリアルタイムで集計する必要もなくバッチで処理したかったのでEmbulkを使うことにしました。またログのフォーマットは特に決まっておらず、操作事で自由に出力しているものとします。

Embulkのプラグインには以下のものがありまして、FILE PARSERを見てみますが自由な出力のログに対してうまく取り込める物はさすがになさそうです。 plugins.embulk.org

なので自前でパーサーを作成し、ログ -> jsonの変換を行い、それからEmbulk経由でElasticsearchにデータを取り込みたいと思います。

とりあえず以下のようにTimestamp, LogLevel, name, スレッド, ログ出力内容にパースしておきます。

case class Log4jLine(date: LocalDateTime, logLevel: LogLevel, name: String, thread: String, content: String) extends LogLine

logging_converter/Log4jParser.scala at 3ec8071cdbc04207d3effc56e81e08734323ea07 · teruuuuuu/logging_converter · GitHub
結果は以下のようになりました。操作の内容で分けておきたかったのですが、今回はcontentにログの出力をまとめていてとりあえずjsonで出力できました。

{"content":"move from CAB to CAA : 17.383","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000202"}
{"content":"hello hello BAA","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000202"}
{"content":"globalAdd 8.308","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000202"}
{"content":"hello hello BAA!","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000203"}
{"content":"globalRemove 61.999","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000203"}
{"content":"move from BCB to BCC : 96.230","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000203"}
{"content":"hello hello CAA!","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000203"}
{"content":"remove BBA:41.788","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000203"}
{"content":"globalAdd 89.201","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000204"}
{"content":"remove CCA:90.795","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000204"}
{"content":"globalAdd 45.250","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000204"}
{"content":"globalAdd 23.735","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000204"}
{"content":"move from ACB to CBA : 77.454","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000204"}
{"content":"hello world ACB","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000204"}
{"content":"set CCC:25.078","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000204"}
{"content":"move from CAC to CBC : 83.523","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000204"}
{"content":"move from BCC to CCA : 86.934","level":"INFO","package":"jp.co.teruuu.LoggingService","thread":"main","timestamp":"2020-05-24T11:30:37.000205"}

次にEmbulkでjsonのデータをelasticserchに取り込むのですが、それぞれの環境構築は以下を参考にしました。
□Embulk

github.com □Elasticsearch

www.elastic.co

Embulkに必要なプラグインは以下のコマンドでインストールしました。

embulk gem install embulk-parser-jsonl
embulk gem install embulk-output-elasticsearch

Embulkの設定ファイルとして以下のseed.ymlを作成しpath_prefixで作成したjsonを参照できるようにしておきます。

in:
  type: file
  path_prefix: 'C:\mywork\embulk\.\json\json\result_'
  parser:
    type: jsonl
    root: $.logs
    schema:
      - {name: timestamp, type: string}
      - {name: level, type: string}
      - {name: package, type: string}
      - {name: thread, type: string}
      - {name: content, type: string}
out:
  type: elasticsearch
  nodes:
  - {host: localhost, port: 9200}
  index: log
  index_type: log

それから以下のコマンドで実行するとElasticserchにデータが取り込まれます。embulk guess では先ほどのseed.ymlを指定します。

embulk gem install embulk-output-elasticsearch
embulk guess ./json/seed.yml -o config.yml
embulk preview config.yml
embulk run     config.yml

最後にElasticserchのデータを検索してみて取り込まれていることが確認できます。今回はパース結果のjsonのフォーマットが操作の内容毎で扱いやすくないので不便ですが、扱いやすいようにパーサーを修正したら時系列データの追跡が大分楽になりそうでした。

curl -XGET -H 'Content-Type: application/json' "localhost:9200/_search?pretty" -d '{
  "_source": ["timestamp", "content"],
  "query": { "match_all": {} },
  "sort": [{"timestamp": {"order": "desc"}}],
  "from": 5,
  "size": 5
}'

{
  "took" : 16,
  "timed_out" : false,
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 1,
    "failures" : [
      {
        "shard" : 0,
        "index" : "customer",
        "node" : "TtHQVusaSJWtY7apyFisKA",
        "reason" : {
          "type" : "query_shard_exception",
          "reason" : "No mapping found for [timestamp] in order to sort on",
          "index_uuid" : "51-lMr9pTWOXp98lwobGUQ",
          "index" : "customer"
        }
      }
    ]
  },
  "hits" : {
    "total" : {
      "value" : 6224,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [
      {
        "_index" : "log",
        "_type" : "log",
        "_id" : "zvhLRXIBA0leiksQaxqN",
        "_score" : null,
        "_source" : {
          "content" : "add CAB:38.532",
          "timestamp" : "2020-05-24T11:30:37.000001"
        },
        "sort" : [
          1590319837000
        ]
      },
      {
        "_index" : "log",
        "_type" : "log",
        "_id" : "z_hLRXIBA0leiksQaxqN",
        "_score" : null,
        "_source" : {
          "content" : "hello hello BBB",
          "timestamp" : "2020-05-24T11:30:37.000001"
        },
        "sort" : [
          1590319837000
        ]
      },
      {
        "_index" : "log",
        "_type" : "log",
        "_id" : "0PhLRXIBA0leiksQaxqN",
        "_score" : null,
        "_source" : {
          "content" : "move from BAC to AAA : 30.179",
          "timestamp" : "2020-05-24T11:30:37.000002"
        },
        "sort" : [
          1590319837000
        ]
      },
      {
        "_index" : "log",
        "_type" : "log",
        "_id" : "0fhLRXIBA0leiksQaxqN",
        "_score" : null,
        "_source" : {
          "content" : "move from AAC to CBA : 71.724",
          "timestamp" : "2020-05-24T11:30:37.000002"
        },
        "sort" : [
          1590319837000
        ]
      },
      {
        "_index" : "log",
        "_type" : "log",
        "_id" : "0vhLRXIBA0leiksQaxqN",
        "_score" : null,
        "_source" : {
          "content" : "globalRemove 13.798",
          "timestamp" : "2020-05-24T11:30:37.000002"
        },
        "sort" : [
          1590319837000
        ]
      }
    ]
  }
}