アクターモデルについて調べてみた

バッチ処理でAkkaを使ってみたのでアクターモデルについて調べたことをまとめてみました。
scalaを触ったことがなく並列処理に興味があるけどやったことがないくらいの人が読める内容を目指したいと思います。

 ・スレッドモデルについて
 ・アクターモデルについて
 ・Akkaとは

まず最初にスレッドモデルについて触れてみたいと思います。
◯スレッドモデルについて
 マルチスレッドモデルとシングルスレッドモデルがある
 マルチスレッドモデル
  →同時に複数のスレッドを実行できるため多重アクセスに対応することができる
   別スレッドで同一インスタンスの値を更新するときとかに事故が起きる
   同一のインスタンスを複数スレッドで使い回すことに問題がある場合、javaならsynchronizedを使うことで、インスタンスを共有しないようにすることができる。
 シングルスレッドモデル
  →一つのスレッドしかないため複数リクエストが来ても順番にしか処理できない
  
即時応答性が求められるシステムの場合はマルチスレッドが求められるようですが難易度が高くスレッドセーフなプログラミングを書けるのは一部の優秀な人に限られていたらしい。問題が起きる例として例えばマルチスレッドの場合アクションクラス内で共有させているSimpleDateFormatクラスに値をセットして表示するメソッドを表示するというメソッドに対しての同時リクエストで結果が正しくなくなるといったことが起き得る。(SimpleDateFormatはクラス内で共有させがちでマルチスレッド環境で不具合が出ることがあるらしい。)

じゃあマルチスレッドだとしてもインスタンスを共有しなければ良いじゃないかと思いますが、スレッドモデルでは性能を上げようとした場合、スケールアップではコアの数を増やしても性能の上限が頭打ちになる問題もありますし、サーバの台数を増やすことでスケールアウトはできますがスケーラブルなアクターモデルの方がスケールアウトは向いているのでしょう(良く分かっていない)。
まぁ、CPUの性能を上げるよりかはマルチコア、複数サーバでの分散処理の流れが来ていますのでスレッドだけでは難しいのでしょう。最近ではリアクティブやらメッセージ駆動が注目されているようですし。

そこでアクターモデルです。
アクターモデルについて
 Wikipedia情報ですが基本概念は以下のようになっています。

アクターモデルの基本は「全てのものはアクターである」という哲学である。これはオブジェクト指向プログラミングにおける「全てのものはオブジェクトである」という考え方と似ているが、オブジェクト指向ソフトウェアでは基本的に逐次的に実行するのに対して、アクターモデルでは本質的に並行性を備えている点が異なる。

アクターは並行的に受信するメッセージに対応した以下のような振る舞いを備えた計算実体(Computational Entity)である:

(他の)アクターに有限個のメッセージを送信する。
有限個の新たなアクターを生成する。
次に受信するメッセージに対する動作を指定する。

これらの振る舞いには逐次性は前提とされておらず、並列的にこれらを実行する。

他のアクターとの通信は非同期に発生する(すなわち、送信側アクターはメッセージが受信されるのを待たずに次の計算に移行する)。

メッセージを送信する相手のアクターはアドレスによって指定される(これをアクターの「メールアドレス」とも呼ぶ)。結果として、アクターはアドレスのあるアクターとのみ通信可能であり、他のアクターのアドレスは以下のような方法で獲得される:

受信したメッセージ内にアドレスが書いてある。
そのアクターが何らかの方法で既に相手のアドレスを知っている。
そのアクターは生成したアクターである。

アクターモデルは、アクター自体およびアクター間の計算の本質的並行性を特徴とし、メッセージ内にアクターのアドレスを含め、相互のやりとりは到着順が保証されない直接的非同期メッセージパッシングのみである。

これだけ見るとメッセージを送るオブジェクトと受け取るオブジェクトがいて、メッセージを送る人は受け取る人の状態に関わらずどんどんメッセージを送信できる。メッセージを受け取る人はキューに溜まったメッセージの内容を順次処理するため、アクターが所有しているインスタンスに非同期にアクセスされることはなくスレッドセーフになる。メッセージを送る人もまたアクターであるからメッセージを受け取ることができる、といった感じか。
アクター間は別々のスレッドで動きアクター内ではシングルスレッドで実行されることは分かったけど、並行的に処理を行うにはプログラムを書く際に何を気をつけなければいけないのかよくわからない。例えば集計バッチをアクターモデルで作成し、集計処理自体を一つのアクターにまとめてしまったらマルチスレッドに比べて遅くなるのかとか気になります。ここら辺は自分で触って試していくしかないと思います。

◯Akkaとは
アクターモデルを実現しているフレームワークです。並列分散処理で注目されておりますが、scala自体初めて触る初心者の人はうまく並列処理行えないんじゃないかなと思っています。
以下に1秒間隔でカウント結果を表示するサンプルを示します。
application.conf

akka {
  quartz {
    schedules {
      MessageTask {
        description = "1秒おきに実行"
        expression = "* * * ? * *"
      },
    }
  }
}

Main.scala

object ActorScheduler extends App {
  val _system = ActorSystem("system")
  // スケジューラを生成
  val scheduler = QuartzSchedulerExtension(_system)
  val messagingActor = _system.actorOf(MessagingActor.props, "messagingActor")
  scheduler.schedule(
    "MessageTask",
    messagingActor,
    "sendMessage"
  )
}

class MessagingActor extends Actor with ActorLogging{
  import MessagingActor._

  val receiveActor = context.actorOf(ReceivingActor.props, "receiveActor")
  def receive = {
    case Initialize =>
      log.info("starting MessagingActor")
    case "sendMessage" =>
      println("send message")
      receiveActor !  SendMessage("%tY/%<tm/%<td %<tH:%<tM:%<tS" format new Date)
  }
}

object MessagingActor {
  val props = Props[MessagingActor]
  case object Initialize
  case class SendMessage(text: String)
}

class ReceivingActor extends Actor with ActorLogging{

  var count:Int = 1

  def receive = {
    case MessagingActor.SendMessage(time: String) =>
      println(time + " message receive " + count)
      count += 1
  }
}

object ReceivingActor {
  val props = Props[ReceivingActor]
}

シンプルなので何をやっているかは見たらだいたい分かると思います。ActorSchedulerが1秒間隔でMessagingActorに"sendMessage"を送るようにスケジューリングし、MessagingActorは"sendMessage"を受け取ったらReceivingActorにSendMessageを送ってReceivingActorは受け取ったメッセージとカウントアップ結果を表示するものになっています。
試しに実行してみると1秒間隔でメッセージを表示できていることが確認できます。

このReceivingActorを例えば以下のように1秒間かかる処理を2つ並べたらどうなるでしょう。

class ReceivingActor extends Actor with ActorLogging{

  var count:Int = 1

  def receive = {
    case MessagingActor.SendMessage(time: String) =>
      println(time + " message receive " + count)
      count += 1

      wait1(1000)
      wait2(1000)
  }
}

試してみると1秒間隔で表示されていたメッセージが2秒間隔になっていることが確認できます。これでアクター自体はシングルスレッドで動くといったことが理解できると思います。

では1つのアクター内で大きめの処理を行う場合はどうすれば良いかというと、Scalaのfutureを使って以下のように修正を行います。

class ReceivingActor extends Actor with ActorLogging{
  var count:Int = 1

  def receive = {
    case MessagingActor.SendMessage(time: String) =>
      println(time + " message receive " + count)
      count += 1

      val f1 = Future(wait1(1000))
      val f2 = Future(wait2(1000))

      val ret = for {
        x <- f1
        y <- f2
      } yield f(x, y)
  }
  def wait1(time: Int): Unit ={
    println("wait1 start")
    Thread.sleep(time)
    println("wait1 end")
  }
  def wait2(time: Int): Unit ={
    println("wait2 start")
    Thread.sleep(time)
    println("wait2 end")
  }
  def f(x:Any,y:Any){println("call yield")}

  def ShowMessage(text: String) = {
    println(text)
  }
}

自分もscalaにはあまり慣れていないのですが、この書き方をするとf1とf2はfor内で同時に評価されるようでreceiveメソッドは1秒ほどで終了するようになるみたいです。試してみると以下のような出力があり、1秒間隔でカウントアップ結果が表示されるため並列で処理が行われているようです。
2017/02/16 19:57:14 message receive 20
wait1 start
wait2 start
wait1 end
wait2 end
call yield

状態を持たないアクターモデルアンチパターンと言われているが、このサンプルでは一応countという状態をもっているのか,そもそも並列処理したいだけならfutureだけ使っていれば良いのか
今回のサンプルだとアクターモデルを使うメリットはよくわからないのですが、複数のタスクをアクターで区切ったり、コストの大きい処理を分割して行うときとかには効果を発揮すると思いました。

アクターモデルを使うなら以下のように分割できる処理は分割していくのが良さそうです。ただakkaだけだと大量のリクエストを受けた時のback pressureがなさそうなので、その場合はakka-streamを使えば良いと思います。

class MessagingActor extends Actor with ActorLogging{
  import MessagingActor._

  val receiveActor1 = context.actorOf(ReceivingActor.props, "receiveActor1")
  val receiveActor2 = context.actorOf(ReceivingActor.props, "receiveActor2")

  def receive = {
    case Initialize =>
      log.info("starting MessagingActor")
    case "sendMessage" =>
      println("send message")
      receiveActor1 !  SendMessage1("%tY/%<tm/%<td %<tH:%<tM:%<tS" format new Date)
      receiveActor2 !  SendMessage2("%tY/%<tm/%<td %<tH:%<tM:%<tS" format new Date)
  }
}

object MessagingActor {
  val props = Props[MessagingActor]
  case object Initialize
  case class SendMessage(text: String)
  case class SendMessage1(text: String)
  case class SendMessage2(text: String)
}


class ReceivingActor extends Actor with ActorLogging{

  var count:Int = 1

  def receive = {
    case MessagingActor.SendMessage1(time: String) =>
      wait1(1000)
    case MessagingActor.SendMessage2(time: String) =>
      wait2(1000)
  }
  def wait1(time: Int): Unit ={
    println("wait1 start")
    Thread.sleep(time)
    println("wait1 end")
  }
  def wait2(time: Int): Unit ={
    println("wait2 start")
    Thread.sleep(time)
    println("wait2 end")
  }
}

object ReceivingActor {
  val props = Props[ReceivingActor]
}