Sparkで状態を持つobjectを使い回す方法について調べてみた

Sparkでの開発時に既存のjava資源を使用するのはよくあると思うけど、objectが状態を持っていて使い回す必要がある場合も考えられるのでその場合どうすれば良いのか調べてみました。

まず以下のように引数で与えた値だけ内部のカウンターを増やすクラスがあったとする。

class MyCounter extends java.io.Serializable{    
  private var count = 0    
  def countUp(up: Int): Unit ={    
    this.count += up    
  }    
  def getCount = this.count    
}    

このクラスに対して1~10の設定でそれぞれの設定によりカウントアップする数値が決まる使い方を想定する。このクラスに対してログを読み込むたびにカウントアップのメソッドを呼び出したいけど、設定を引数として渡すものになっているので1の設定なら次カウントアップする際も1の設定を維持するといった必要があります。

まず設定とカウンターobjectをRDDにしてmap処理にてカウントアップ後自分自身を返すようにしてみます。この場合は想定通り、map後にカウントアップ後の結果が入っていることが確認できます。

val configRdd = sc.makeRDD(1 to 10)    
val counterRdd = configRdd.map((new MyCounter, _))    

val afterCountup = counterRdd.map{ exeConfig =>    
  exeConfig._1.countUp(exeConfig._2)    
  exeConfig    
}    
afterCountup.foreach{ exeConfig =>    
  println(exeConfig._1.getCount)    
}    

次に一つのcounterオブジェクトをログのRDDのmap処理内でカウントアップしてみる。
これはtask内でobjectが共有されるためcount=1の結果が100個にならない
自分の環境だとmap後の100個のカウンターが0~50の値を取っていた。map処理内でtaskが2個に別れたのか。

val myc = new MyCounter    
val logRdd = sc.makeRDD(1 to 100)    
val newc = logRdd.map { log =>    
  myc.countUp(1)    
  myc    
}    
newc.foreach{row => println(row.getCount) }    

foreach内でのカウントアップだが、これだとforeachの中と外では別のobjectのためカウントアップの結果はforeachの外には反映されていない。

val myc = new MyCounter    
val logRdd = sc.makeRDD(1 to 100)    
logRdd.foreach { log =>    
  myc.countUp(1)    
}    
println(myc.getCount)    

並列処理とは関係なくなるけど配列のforeach処理内でカウンターobjectをカウントアップするのであれば想定通りカウントアップする。

val myc = new MyCounter    
val logRdd = sc.makeRDD(1 to 100)    
logRdd.collect.foreach { log =>    
  myc.countUp(1)    
}    
println(myc.getCount)    

1 ~ 10の設定のobjectに対してlogを読み込んで状態を持つobjectでカウントアップする場合は、カウンターのobjectをRDDにしてそれからmapで配列に変換したログのforeachでカウントアップするようにしたら良さそう。この場合はRDDとして使用するカウンターobjectの数だけ並列に処理することになるのか。

val configRdd = sc.makeRDD(1 to 10)    
val counterRdd = configRdd.map((new MyCounter, _))    
val logRdd = sc.makeRDD(1 to 100)    

val logRddArray = logRdd.collect    
val logRddArrayBroadcast = sc.broadcast(logRddArray)    


val countUp = counterRdd.map{ exeConfig =>    
  logRddArrayBroadcast.value.foreach{log=>    
    exeConfig._1.countUp(exeConfig._2)    
  }    
  exeConfig    
}    
countUp.foreach { exeConfig =>    
  println(exeConfig._1.getCount)    
}