Reading Nested Parquet File in Scala and Exporting to CSV
Recently, we were working on a problem where a Parquet compressed file had lots of nested tables. Some of the tables had columns with an Array type. Our objective was to read the file and save it to CSV.
We wrote a script in Scala that does the following:
- Handles nested Parquet compressed content
- Looks for columns called “Array” and then removes those columns
Here is the script:
def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
schema.fields.flatMap(f => {
val colPath = if (prefix == null) s"`${f.name}`" else s"${prefix}.`${f.name}`"
f.dataType match {
case st: StructType => flattenSchema(st, colPath)
// Skip user defined types like array or vectors
case x if x.isInstanceOf[ArrayType] => Array.empty[Column]
case _ => Array(col(colPath).alias(colPath.replaceAll("[.`]", "_")))
}
})
}
Here are the all the steps you would need to take while reading the Parquet compressed content and then export it to-disk as a CSV.
val spark = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
scala> :paste
// Entering paste mode (ctrl-D to finish)
def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
schema.fields.flatMap(f => {
val colPath = if (prefix == null) s"`${f.name}`" else s"${prefix}.`${f.name}`"
f.dataType match {
case st: StructType => flattenSchema(st, colPath)
// Skip user defined types like array or vectors
case x if x.isInstanceOf[ArrayType] => Array.empty[Column]
case _ => Array(col(colPath).alias(colPath.replaceAll("[.`]", "_")))
}
})
}
// Exiting paste mode, now interpreting.
flattenSchema: (schema: org.apache.spark.sql.types.StructType, prefix: String)Array[org.apache.spark.sql.Column]
scala >
val df = spark.read.parquet("/user/avkash/test.parquet")
df.select(flattenSchema(df.schema):_*).write.format("com.databricks.spark.csv").save("/Users/avkashchauhan/Downloads/saveit/result.csv")
If you want to see the full working scripts with output, you can visit any of the following links based on your Spark Version:
- Here is the full working demo in Spark 2.1.0
- Here is the full working demo in Spark 1.6.x
We got some help from this StackOverflow discussion. Michal Kurka and Michal Malohlava helped me to write above solution, thanks guys.
Thats it. Enjoy!