In Spark if you try to do anything cute [read: inheritance from an abstract class] with case classes, i.e.,
abstract class PairReducable[A <: Keyable, B, P <: PairReducable[A, B, P]]
(val item: A, val amount: B)
extends Product2[A, B]
with Amount[B] {
override val _1 = item
override val _2 = amount
def reduce(other: PairReducable[A, B, P]): P =
this.item.toKey == other.item.toKey match {
case false =>
throw new UnsupportedOperationException("the key values for both items must match in order to combine amounts")
case true => reduceWith(other.amount)
}
protected def reduceWith(othersAmount: B): P
}
case class Count[A <: Keyable](override val item: A, override val amount: Long)
extends PairReducable[A, Long, Count[A]](item, amount) {
override protected def reduceWith(othersAmount: Long): Count[A] =
new Count(item, amount + othersAmount)
}
when you encounter serialization exceptions on your workers all you may need to do to resolve it is add java.io.Serializable
:
abstract class PairReducable[A <: Keyable, B, P <: PairReducable[A, B, P]]
(val item: A, val amount: B)
extends Product2[A, B]
with Amount[B]
with java.io.Serializable {
/* ... */
}
case class Count[A <: Keyable](override val item: A, override val amount: Long)
extends PairReducable[A, Long, Count[A]](item, amount)
with java.io.Serializable {
/* ... */
}
You most likely do not need to give them dummy empty parameter constructors, but if you did, it might look something like this terrible thing:
case class Count[A <: Keyable](override val item: A, override val amount: Long)
extends PairReducable[A, Long, Count[A]](item, amount)
with java.io.Serializable {
def this() = this(null.asInstanceOf[A], 0L)
/* ... */
}