Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save lanceon/2c1c9910d32370845d4cf718e014d3c9 to your computer and use it in GitHub Desktop.

Select an option

Save lanceon/2c1c9910d32370845d4cf718e014d3c9 to your computer and use it in GitHub Desktop.

Revisions

  1. @jrudolph jrudolph revised this gist Jun 8, 2015. 1 changed file with 0 additions and 1 deletion.
    1 change: 0 additions & 1 deletion TestMultipartFileUpload.scala
    Original file line number Diff line number Diff line change
    @@ -57,7 +57,6 @@ object TestMultipartFileUpload extends App {
    lastReport = now
    lastSize = newSize
    }
    Thread.sleep(1)
    (newSize, newChunks)
    }

  2. @jrudolph jrudolph revised this gist Jun 8, 2015. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion TestMultipartFileUpload.scala
    Original file line number Diff line number Diff line change
    @@ -83,7 +83,7 @@ object TestMultipartFileUpload extends App {
    Source.single(
    Multipart.FormData.BodyPart(
    "test",
    HttpEntity(MediaTypes.`application/octet-stream`, file.length(), SynchronousFileSource(file)),
    HttpEntity(MediaTypes.`application/octet-stream`, file.length(), SynchronousFileSource(file, chunkSize = 100000)), // the chunk size here is currently critical for performance
    Map("filename" -> file.getName))))
    Marshal(formData).to[RequestEntity]
    }
  3. @jrudolph jrudolph created this gist Jun 8, 2015.
    121 changes: 121 additions & 0 deletions TestMultipartFileUpload.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,121 @@
    package akka.http.scaladsl

    import java.io.File

    import akka.http.scaladsl.unmarshalling.Unmarshal
    import akka.util.ByteString

    import scala.concurrent.duration._

    import akka.actor.ActorSystem
    import akka.http.scaladsl.Http.ServerBinding
    import akka.http.scaladsl.marshalling.Marshal
    import akka.http.scaladsl.model._
    import akka.http.scaladsl.server.Route
    import akka.stream.ActorFlowMaterializer
    import akka.stream.io.SynchronousFileSource
    import akka.stream.scaladsl.Source
    import com.typesafe.config.{ ConfigFactory, Config }

    import scala.concurrent.Future

    object TestMultipartFileUpload extends App {
    val testConf: Config = ConfigFactory.parseString("""
    akka.loglevel = INFO
    akka.log-dead-letters = off""")
    implicit val system = ActorSystem("ServerTest", testConf)
    import system.dispatcher
    implicit val materializer = ActorFlowMaterializer()

    val testFile = new File(args(0))

    def startTestServer(): Future[ServerBinding] = {
    import akka.http.scaladsl.server.Directives._

    val route: Route =
    path("upload") {
    entity(as[Multipart.FormData]) { (formdata: Multipart.FormData)
    val fileNamesFuture = formdata.parts.mapAsync(1) { p
    println(s"Got part. name: ${p.name} filename: ${p.filename}")

    println("Counting size...")
    @volatile var lastReport = System.currentTimeMillis()
    @volatile var lastSize = 0L
    def receiveChunk(counter: (Long, Long), chunk: ByteString): (Long, Long) = {
    val (oldSize, oldChunks) = counter
    val newSize = oldSize + chunk.size
    val newChunks = oldChunks + 1

    val now = System.currentTimeMillis()
    if (now > lastReport + 1000) {
    val lastedTotal = now - lastReport
    val bytesSinceLast = newSize - lastSize
    val speedMBPS = bytesSinceLast.toDouble / 1000000 /* bytes per MB */ / lastedTotal * 1000 /* millis per second */

    println(f"Already got $newChunks%7d chunks with total size $newSize%11d bytes avg chunksize ${newSize / newChunks}%7d bytes/chunk speed: $speedMBPS%6.2f MB/s")

    lastReport = now
    lastSize = newSize
    }
    Thread.sleep(1)
    (newSize, newChunks)
    }

    p.entity.dataBytes.runFold((0L, 0L))(receiveChunk).map {
    case (size, numChunks)
    println(s"Size is $size")
    (p.name, p.filename, size)
    }
    }.runFold(Seq.empty[(String, Option[String], Long)])(_ :+ _).map(_.mkString(", "))

    complete {
    fileNamesFuture
    }
    }
    }
    Http().bindAndHandle(route, interface = "localhost", port = 0)
    }

    def createEntity(file: File): Future[RequestEntity] = {
    require(file.exists())
    val formData =
    Multipart.FormData(
    Source.single(
    Multipart.FormData.BodyPart(
    "test",
    HttpEntity(MediaTypes.`application/octet-stream`, file.length(), SynchronousFileSource(file)),
    Map("filename" -> file.getName))))
    Marshal(formData).to[RequestEntity]
    }

    def createRequest(target: Uri, file: File): Future[HttpRequest] =
    for {
    e createEntity(file)
    } yield HttpRequest(HttpMethods.POST, uri = target, entity = e)

    try {
    val result =
    for {
    ServerBinding(address) startTestServer()
    _ = println(s"Server up at $address")
    port = address.getPort
    target = Uri(scheme = "http", authority = Uri.Authority(Uri.Host("localhost"), port = port), path = Uri.Path("/upload"))
    req createRequest(target, testFile)
    _ = println(s"Running request, uploading test file of size ${testFile.length} bytes")
    response Http().singleRequest(req)
    responseBodyAsString Unmarshal(response).to[String]
    } yield responseBodyAsString

    result.onComplete { res
    println(s"The result was $res")
    system.shutdown()
    }

    system.scheduler.scheduleOnce(60.seconds) {
    println("Shutting down after timeout...")
    system.shutdown()
    }
    } catch {
    case _: Throwable system.shutdown()
    }
    }