Parsing lines from Spark RDD
A typical Apache Spark application using RDD api starts as following:
val lines = sc.textFile("/path/to/data")
val records = lines.map(parseLineToRecord)
case class Record(...)
def parseLineToRecord(line: String) : Record = {
val parts = line.split("\t", -1)
...
Record(..)
}
In case of bad records you very often want to discard the unparseable lines:
def parseLineToRecordOption(line: String) = Option[Record] = {
try {
...
Some(Record(..))
} catch {
case _ => None
}
}
val records = lines.map(parseLineToRecordOption).filter(x => x.isDefined).map(x => x.get)
And then you discover that you there is an implicit conversion from Option[T] to Iterable[T]. The nice thing is that you now can use flatMap instead of filter + map:
val records = lines.flatMap(parseLineToRecordOption)
Strangely enough there is no such implicit conversion for a Try[T] so we convert to Option first:
def tryParseLineToRecordOption(line: String) : Try[Record] =
Try {
...
Some(Record(..))
}
val records = lines.map(tryParseLineToRecordOption).flatMap(x => x.toOption)