ApacheSparkで扱うobjectのSerializableの必要性について

ApacheSparkで扱うobjectのSerializableの必要性について

hiveやファイルからデータを読み込んだ直後値はRDD, Dataset, DataFrameになっていて、少ないデータに対して何回もfilter処理を行う必要がある場合に一旦collectして配列に変換しdriver内で処理したい場合もあると思うけど、データを読み込んだ際にnon-serializableなクラスに値をセットしていたらcollectで配列への変換時にエラーが発生したのでその際のメモ

例えば以下のようなjavaのクラスがあったとして、これをsparkで利用するとする

public class IdsBean {
  private int id;
  public IdsBean(int id) { /* compiled code */ }
  public int getId() { /* compiled code */ }
  public void setId(int id) { /* compiled code */ }
}
scala> val rdd = sc.makeRDD(1 to 10)
scala> val rddBean = rdd.map(new IdsBean(_))

foreachやmapの処理は行える

scala> rddBean.foreach{ row => println(row.getId) }
6
7
8
9
10
1
2
3
4
5
scala> rddBean.map(row=>row).foreach(row=>println(row.getId))
1
2
3
4
5
6
7
8
9
10

ただcollectの処理はエラーが発生する

scala> rddBean.collect
[Stage 10:>                                                         (0 + 0) / 2]17/10/07 13:43:41 ERROR executor.Executor: Exception in task 0.0 in stage 10.0 (TID 20)
java.io.NotSerializableException: dto.IdsBean
Serialization stack:
    - object not serializable (class: dto.IdsBean, value: dto.IdsBean@2872403e)
    - element of array (index: 0)
    ...

collectの処理ではRDDは各executorに分散されていてそれをdriverに集めるのだが、その際にserialize → deserializeの処理が実行されるのだが、対象のクラスがnon-serializableの場合は シリアライズできないのでcollectの処理を実行する時点でエラーが発生するようだ。

対応策として以下のように対象のクラスを継承してserializableにすることを考えたがそれではダメだった。

scala> class IdsBeanS(id: Int) extends dto.IdsBean(id) with java.io.Serializable

これについてはjava.io.Serializableのjavadocでコメントがあり、non-serializableなクラスのsubtypeをSerilizableにする場合はスーパークラスのpublic, protectedなメンバ変数がシリアライズの対象となるようだ。

To allow subtypes of non-serializable classes to be serialized, the
subtype may assume responsibility for saving and restoring the
state of the supertype's public, protected, and (if accessible)
package fields.  The subtype may assume this responsibility only if
the class it extends has an accessible no-arg constructor to
initialize the class's state.  It is an error to declare a class
Serializable if this is not the case.  The error will be detected at
runtime.

また、executor-driver間でobjectを共有する場合はSerializableでになっている必要がわかった。dirver-executor間で値を共有するタイミングだがmapやforeach、collectなどのapiを実行時のようで、シリアライズ、でシリアライズの対象になっているobjectはapiによっても違っているようだ。 例えば先ほどのnon-serializableなRDDについて以下の処理であれば問題ない。

scala> rddBean.foreach{ row => println(row.getId) }

だが、mapやforeachの処理内でnon-sirializableなクラスを使用しようとしたらエラーが発生する。mapやforeachの内部で使うobjectは各executorに送られるのでserializableである必要があるようだ。

scala> val rdd = sc.makeRDD(1 to 10)
scala> rdd.foreach{rddRow =>
     |   rddBean.foreach(beanRow => println(beanRow.getId))
     | }
17/10/07 14:09:03 ERROR executor.Executor: Exception in task 0.0 in stage 13.0 (TID 26)
org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

いつexecutor間にobjectが送られるのかは知っていた方が良いけど、基本的にはSerializableなobjectを使うのが安全そう。 あと同一のexecutorにobjectが送られたとしてもtask内でシリアライズ、デシリアライズして使用するからtask間で同じobjectを参照するということはないらしい。 (基本的なことだけど、object自体が状態を持っていてスレッドセーフでなければ問題が発生するかもしれない) https://twitter.com/maropu/status/889747740858568704