import org.apache.spark.sql.SparkSession import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types.{StructType,ArrayType} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Column import org.apache.spark.sql.functions.col import org.apache.spark.sql.functions.explode_outer object FlattenJson extends App { val conf = new SparkConf().setMaster("local[*]").setAppName("JSON Flattener") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val inputJson = """|{ | "name":"John", | "age":30, | "bike":{ | "name":"Bajaj", "models":["Dominor", "Pulsar"] | }, | "cars": [ | { "name":"Ford", "models":[ "Fiesta", "Focus", "Mustang" ] }, | { "name":"BMW", "models":[ "320", "X3", "X5" ] }, | { "name":"Fiat", "models":[ "500", "Panda" ] } | ] |}""".stripMargin('|') println(inputJson) //creating rdd for the json val jsonRDD = sc.parallelize(inputJson::Nil) //creating DF for the json val jsonDF = sqlContext.read.json(jsonRDD) //Schema of the JSON DataFrame before Flattening jsonDF.schema //Output DataFrame Before Flattening jsonDF.show(false) //Function for exploding Array and StructType column def flattenDataframe(df: DataFrame): DataFrame = { val fields = df.schema.fields val fieldNames = fields.map(x => x.name) val length = fields.length for(i <- 0 to fields.length-1){ val field = fields(i) val fieldtype = field.dataType val fieldName = field.name fieldtype match { case arrayType: ArrayType => val fieldNamesExcludingArray = fieldNames.filter(_!=fieldName) val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName) as $fieldName") // val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName.*")) val explodedDf = df.selectExpr(fieldNamesAndExplode:_*) return flattenDataframe(explodedDf) case structType: StructType => val childFieldnames = structType.fieldNames.map(childname => fieldName +"."+childname) val newfieldNames = fieldNames.filter(_!= fieldName) ++ childFieldnames val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_")))) val explodedf = df.select(renamedcols:_*) return flattenDataframe(explodedf) case _ => } } df } val flattendedJSON = flattenDataframe(jsonDF) //schema of the JSON after Flattening flattendedJSON.schema //Output DataFrame After Flattening flattendedJSON.show(false) }