Last active
June 1, 2020 09:27
-
-
Save eugkhp/15429e56b9e19184c69b752b97eddd1d to your computer and use it in GitHub Desktop.
Zip/unzip
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| zipped successfully | |
| Vector(80, 75, 3, 4, 20, 0, 8, 8, 8, 0, 101, 99, -63, 80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 116, 101, 115, 116, 95, 115, 101, 114, 105, 101, 115, 46, 106, 115, 111, 110, 45, -53, 77, 10, -62, 48, 16, 5, -32, -69, 100, -35, -108, -55, 24, -45, 52, 39, 112, 39, -120, 110, 20, 41, 105, -102, 74, -80, 127, 52, -87, 32, -59, -69, 59, -48, 50, -101, 121, -13, -26, 91, 89, 104, -104, 97, -54, 53, 26, 81, 32, -81, -91, -10, 92, 74, -19, -72, 110, -75, -26, 5, -70, -78, 61, 74, 13, -115, 2, -106, -79, 20, 122, 31, -3, 28, 124, 36, -109, 124, 76, -43, -98, 50, -10, -102, -19, -80, 116, 118, 14, -23, 75, -35, -23, 124, -69, -48, 113, 26, -61, -112, -24, -9, -79, 110, 52, -39, 126, -94, 22, 1, -127, -125, -30, 32, -82, 80, 26, 44, -116, -128, 28, -124, 86, 120, -72, 19, -86, 23, -9, -10, -69, -38, 118, 34, 113, -20, 125, -75, -89, -116, 125, 108, -73, 120, 102, 48, -121, -33, -109, -26, 15, 80, 75, 7, 8, -86, -29, -11, -5, -103, 0, 0, 0, -59, 0, 0, 0, 80, 75, 3, 4, 20, 0, 8, 8, 8, 0, 101, 99, -63, 80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 0, 0, 0, 116, 101, 115, 116, 95, 115, 101, 114, 105, 101, 115, 50, 46, 106, 115, 111, 110, 45, -117, -53, 10, -62, 48, 16, 69, -1, 37, -21, -90, 76, -110, 54, 105, -13, 5, -18, 4, -47, -115, 34, -110, -104, 65, -126, 125, -47, -92, -126, 20, -1, -35, -127, -106, -69, -103, 59, -25, -98, -107, -59, -64, 44, 19, 117, 80, 18, 43, -63, -101, 86, 105, 94, 53, 109, -32, 30, -116, -29, 104, -68, 55, 26, -107, -126, 80, -77, -126, -27, -40, 99, -62, 57, 98, 34, 39, 99, -54, -113, -83, 73, 98, -81, -39, 13, 75, -25, -26, -104, -65, 4, 15, -57, -53, -119, -98, -45, 24, -121, 76, -29, -37, -70, -71, -39, -11, 19, 81, 9, 18, 56, 104, 14, -30, 12, -83, -107, -58, 10, 40, 65, 52, 90, -86, 43, 73, 126, 121, -66, 113, -73, -74, -101, -108, 52, -10, -8, -40, 91, -63, 62, -82, 91, -112, 89, 89, -62, -17, 78, -7, 3, 80, 75, 7, 8, -113, -115, 76, -28, -103, 0, 0, 0, -58, 0, 0, 0) | |
| close state:0 | |
| String: | |
| String: | |
| unzipped successfully | |
| exhausted input Got string: | |
| Process finished with exit code 0 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| trait Zipper[F[_]] { | |
| def zip(series: Vector[SeriesData]): F[Stream[F, Byte]] | |
| def unzip(zipped: Stream[F, Byte]): F[Either[ParsingError, Vector[SeriesData]]] | |
| } | |
| object Zipper { | |
| def make[F[_]: Sync: Logging: Clock: ConcurrentEffect: ContextShift]( | |
| blocker: Blocker | |
| ): Zipper[F] = { | |
| new Impl[F](blocker) | |
| } | |
| case class Impl[F[_]: Sync: Logging: Clock: ConcurrentEffect: ContextShift](blocker: Blocker) extends Zipper[F] { | |
| val chunkSize = 1024 | |
| val E: MonadError[F, Throwable] = MonadError[F, Throwable] | |
| override def zip(series: Vector[SeriesData]): F[Stream[F, Byte]] = { | |
| def writeToZos(series: Vector[SeriesData]): ZipOutputStream => Try[Unit] = { zos => | |
| Try { | |
| series.foreach { s => | |
| val entry = new ZipEntry(s"${s.timeseries}.json") | |
| zos.putNextEntry(entry) | |
| zos.write(s.asJson.noSpaces.getBytes(StandardCharsets.UTF_8)) | |
| zos.closeEntry() | |
| } | |
| } | |
| } | |
| Sync[F].delay { | |
| io.readOutputStream(blocker, chunkSize) { output => | |
| val zosF: F[ZipOutputStream] = Sync[F].delay(new ZipOutputStream(output)) | |
| for { | |
| zos <- zosF | |
| _ <- E.fromTry(writeToZos(series)(zos)) | |
| } yield () | |
| } | |
| } | |
| } | |
| override def unzip(zipped: Stream[F, Byte]): F[Either[ParsingError, Vector[SeriesData]]] = { | |
| unzipF[F](zipped, blocker).compile.toVector | |
| .flatMap(_.sequence) | |
| .map { | |
| _.map(json => | |
| decode[SeriesData](json) | |
| .fold(err => Left(ParsingError(err.getMessage + " Got string: " + json)), Right(_)) | |
| ) | |
| } | |
| .map(_.sequence) | |
| } | |
| def unzipF[G[_]](zipped: Stream[G, Byte], bec: Blocker, chunkSize: Int = chunkSize)(implicit | |
| F: ConcurrentEffect[G], | |
| cs: ContextShift[G] | |
| ): Stream[G, G[String]] = { | |
| zipped.through(unzipP(bec, chunkSize)) | |
| } | |
| def unzipP[G[_]]( | |
| bec: Blocker, | |
| chunkSize: Int = chunkSize | |
| )(implicit F: ConcurrentEffect[G], cs: ContextShift[G]): Pipe[G, Byte, G[String]] = { | |
| def unzipEntries(zis: ZipInputStream): Stream[G, G[String]] = | |
| Stream.unfoldEval(zis) { zis0 => | |
| entry(zis0).map((_, zis0)).value | |
| } | |
| def entry(zis: ZipInputStream): OptionT[G, G[String]] = | |
| OptionT(Sync[G].delay(Option(zis.getNextEntry))).map { x => | |
| val string = io | |
| .readInputStream[G](F.delay(zis), chunkSize, bec, closeAfterUse = false) | |
| .through(text.utf8Decode) | |
| .compile | |
| .string | |
| string.flatMap { x => | |
| println("String:" + x) | |
| F.delay(x) | |
| } | |
| } | |
| val pipe: Stream[G, Byte] => Stream[G, G[String]] = { in => | |
| in.through(io.toInputStream).flatMap { is: InputStream => | |
| val zis: G[ZipInputStream] = Sync[G].delay(new ZipInputStream(is)) | |
| val zres: Stream[G, ZipInputStream] = | |
| Stream.bracket(zis)(zis => Sync[G].delay(println("close state:" + zis.available()))) | |
| zres.flatMap { z => | |
| unzipEntries(z) | |
| } | |
| } | |
| } | |
| pipe | |
| } | |
| } | |
| case class ParsingError(message: String) extends RuntimeException |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class ZipSpec extends AnyFlatSpec with Matchers { | |
| implicit val ec = Scheduler.global | |
| implicit lazy val catsEffect: ConcurrentEffect[Task] = | |
| new CatsConcurrentEffectForTask()(Scheduler.global, Task.defaultOptions) | |
| implicit val logs: Logs[Task, Task] = Logs.sync[Task, Task] | |
| it should "zip and unzip series" in { | |
| val series1 = MDSSeriesData(SID.nextId(), | |
| "test_series", | |
| Granularity.Hour, | |
| Vector[MDSPoint](MDSPoint(Instant.now, Vector[Bucket](Bucket("some_bucket", 2.0)))) | |
| ) | |
| val series = Vector(series1, series1.copy(SID.nextId(), "test_series2")) | |
| def init(): Resource[Task, Zipper[Task]] = { | |
| for { | |
| implicit0(log: Logging[Task]) <- Logs[Task, Task].byName("gw").resource | |
| blocker: Blocker <- Blocker.apply | |
| } yield Zipper.make(blocker) | |
| } | |
| init() | |
| .use { zipper => | |
| for { | |
| zip: fs2.Stream[Task, Byte] <- zipper.zip(series) | |
| _ <- println("zipped successfully").pure[Task] | |
| data <- zip.compile.toVector | |
| _ <- println(data).pure[Task] | |
| unzip <- zipper.unzip(fs2.Stream.emits(data)) | |
| _ <- println("unzipped successfully").pure[Task] | |
| _ <- printRes(unzip).pure[Task] | |
| } yield () | |
| } | |
| .runSyncUnsafe() | |
| def printRes(either: Either[ParsingError, Vector[SeriesData]]): Unit = { | |
| either match { | |
| case Left(error) => println(error.message) | |
| case Right(vector) => println(vector) | |
| } | |
| () | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment