Skip to content

Instantly share code, notes, and snippets.

@tomduhourq
Created December 24, 2018 20:05
Show Gist options
  • Select an option

  • Save tomduhourq/f7126b2ce30d6d37a8d0e6dbf1c690d8 to your computer and use it in GitHub Desktop.

Select an option

Save tomduhourq/f7126b2ce30d6d37a8d0e6dbf1c690d8 to your computer and use it in GitHub Desktop.

Revisions

  1. tomduhourq created this gist Dec 24, 2018.
    15 changes: 15 additions & 0 deletions SparkMapPartitionsHttpClient.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,15 @@
    val serviceResponsesRDD: RDD[List[ServiceResponse]] = requestsRDD.mapPartitions{ requestIterator =>
    val requests = requestIterator.toList
    logger.info(s"[START] Query service with ${requests.length} requests")

    // Provides a CloseableHttpClient with back off should the service fail
    val httpClient = HttpClientBuilder.withBackoff()
    val responsesToAwait = requests.map(request => Service.call(request, httpClient))
    val sequenceResponses = Future.sequence(responsesToAwait)
    val responses = Await.result(sequenceResponses, 2 minutes)

    logger.info(s"[FINISH] Query service with ${requests.length} requests")
    httpClient.close()

    responses.iterator
    }