import org.apache.spark.sql.DataFrameReader def configureReaderWithElasticSearch(options: Map[String,String]) : DataFrameReader = { val fr = spark.read.format("es") def go(fr: DataFrameReader, optMap: Map[String, String]) = optMap.foldLeft(fr)((reader, c) => reader.option(c._1, c._2)) go(fr, options) } def loadDataFromElasticSearch(index: String)(reader: DataFrameReader) : DataFrame = reader.load(s"$index/$index") def loadDataFromES2(cluster: String) : Map[String,String] => DataFrame = (loadDataFromElasticSearch(cluster) _) compose configureReaderWithElasticSearch def loadClusters2(issueTypeId: String): DataFrame = { val esConfigOptions = Map( ConfigurationOptions.ES_NODES_WAN_ONLY -> Settings.esWanOnly, ConfigurationOptions.ES_PORT -> Settings.esPort, ConfigurationOptions.ES_NODES -> Settings.esNodes, "es.read.metadata" -> "true" ) val clusters = loadDataFromES2("cluster")(esConfigOptions) .withColumn("id", $"_metadata._id") .drop("_metadata", "changeRequestIds", "comments", "rate", "clusterId", "originalId") .filter($"issueTypeId" === issueTypeId) addColumnsToDataframe(clusters, joinColumns.toArray) }