Skip to content

Instantly share code, notes, and snippets.

@raksja
Last active February 3, 2018 02:46
Show Gist options
  • Save raksja/6da75ad3d48ca7b47ce0b432d96187df to your computer and use it in GitHub Desktop.
Save raksja/6da75ad3d48ca7b47ce0b432d96187df to your computer and use it in GitHub Desktop.
Spark job with Async HTTP call
// Basically we used this to do a batch operation for a huge query into multiple sub queries.
// Break down your huge workload into smaller chunks, in this case huge query string is broken
// down to a small set of subqueries
// Here if needed to optimize further down, you can provide an optimal partition when parallelizing
val queries = sqlContext.sparkContext.parallelize[String](subQueryList.toSeq)
// Then map each one those to a Spark Task, in this case its a Future that returns a string
val tasks: RDD[Future[String]] = queries.map(query => {
val task = makeHttpCall(query) // Method returns http call response as a Future[String]
task.recover {
case ex => logger.error("recover: " + ex.printStackTrace()) }
task onFailure {
case t => logger.error("execution failed: " + t.getMessage) }
task
})
// Note:: Http call is still not invoked, you are including this as part of the lineage
// Then in each partition you combine all Futures (means there could be several tasks in each partition) and sequence it
// And Await for the result, in this way you making it to block untill all the future in that sequence is resolved
val contentRdd = tasks.mapPartitions[String] { f: Iterator[Future[String]] =>
val searchFuture: Future[Iterator[String]] = Future sequence f
Await.result(searchFuture, threadWaitTime.seconds)
}
// Note: At this point, you can do any transformations on this rdd and it will be appended to the lineage.
// When you perform any action on that Rdd, then at that point,
// those mapPartition process will be evaluated to find the tasks and the subqueries to perform a full parallel http requests and
// collect those data in a single rdd.
// If you dont want to perform any transformation on the content like parsing the response payload, etc. Then you could use foreachPartition instead of mapPartitions to perform all those http calls immediately.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment