Skip to content

Instantly share code, notes, and snippets.

@eugkhp
Last active June 1, 2020 09:27
Show Gist options
  • Select an option

  • Save eugkhp/15429e56b9e19184c69b752b97eddd1d to your computer and use it in GitHub Desktop.

Select an option

Save eugkhp/15429e56b9e19184c69b752b97eddd1d to your computer and use it in GitHub Desktop.
Zip/unzip
zipped successfully
Vector(80, 75, 3, 4, 20, 0, 8, 8, 8, 0, 101, 99, -63, 80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 116, 101, 115, 116, 95, 115, 101, 114, 105, 101, 115, 46, 106, 115, 111, 110, 45, -53, 77, 10, -62, 48, 16, 5, -32, -69, 100, -35, -108, -55, 24, -45, 52, 39, 112, 39, -120, 110, 20, 41, 105, -102, 74, -80, 127, 52, -87, 32, -59, -69, 59, -48, 50, -101, 121, -13, -26, 91, 89, 104, -104, 97, -54, 53, 26, 81, 32, -81, -91, -10, 92, 74, -19, -72, 110, -75, -26, 5, -70, -78, 61, 74, 13, -115, 2, -106, -79, 20, 122, 31, -3, 28, 124, 36, -109, 124, 76, -43, -98, 50, -10, -102, -19, -80, 116, 118, 14, -23, 75, -35, -23, 124, -69, -48, 113, 26, -61, -112, -24, -9, -79, 110, 52, -39, 126, -94, 22, 1, -127, -125, -30, 32, -82, 80, 26, 44, -116, -128, 28, -124, 86, 120, -72, 19, -86, 23, -9, -10, -69, -38, 118, 34, 113, -20, 125, -75, -89, -116, 125, 108, -73, 120, 102, 48, -121, -33, -109, -26, 15, 80, 75, 7, 8, -86, -29, -11, -5, -103, 0, 0, 0, -59, 0, 0, 0, 80, 75, 3, 4, 20, 0, 8, 8, 8, 0, 101, 99, -63, 80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 0, 0, 0, 116, 101, 115, 116, 95, 115, 101, 114, 105, 101, 115, 50, 46, 106, 115, 111, 110, 45, -117, -53, 10, -62, 48, 16, 69, -1, 37, -21, -90, 76, -110, 54, 105, -13, 5, -18, 4, -47, -115, 34, -110, -104, 65, -126, 125, -47, -92, -126, 20, -1, -35, -127, -106, -69, -103, 59, -25, -98, -107, -59, -64, 44, 19, 117, 80, 18, 43, -63, -101, 86, 105, 94, 53, 109, -32, 30, -116, -29, 104, -68, 55, 26, -107, -126, 80, -77, -126, -27, -40, 99, -62, 57, 98, 34, 39, 99, -54, -113, -83, 73, 98, -81, -39, 13, 75, -25, -26, -104, -65, 4, 15, -57, -53, -119, -98, -45, 24, -121, 76, -29, -37, -70, -71, -39, -11, 19, 81, 9, 18, 56, 104, 14, -30, 12, -83, -107, -58, 10, 40, 65, 52, 90, -86, 43, 73, 126, 121, -66, 113, -73, -74, -101, -108, 52, -10, -8, -40, 91, -63, 62, -82, 91, -112, 89, 89, -62, -17, 78, -7, 3, 80, 75, 7, 8, -113, -115, 76, -28, -103, 0, 0, 0, -58, 0, 0, 0)
close state:0
String:
String:
unzipped successfully
exhausted input Got string:
Process finished with exit code 0
trait Zipper[F[_]] {
def zip(series: Vector[SeriesData]): F[Stream[F, Byte]]
def unzip(zipped: Stream[F, Byte]): F[Either[ParsingError, Vector[SeriesData]]]
}
object Zipper {
def make[F[_]: Sync: Logging: Clock: ConcurrentEffect: ContextShift](
blocker: Blocker
): Zipper[F] = {
new Impl[F](blocker)
}
case class Impl[F[_]: Sync: Logging: Clock: ConcurrentEffect: ContextShift](blocker: Blocker) extends Zipper[F] {
val chunkSize = 1024
val E: MonadError[F, Throwable] = MonadError[F, Throwable]
override def zip(series: Vector[SeriesData]): F[Stream[F, Byte]] = {
def writeToZos(series: Vector[SeriesData]): ZipOutputStream => Try[Unit] = { zos =>
Try {
series.foreach { s =>
val entry = new ZipEntry(s"${s.timeseries}.json")
zos.putNextEntry(entry)
zos.write(s.asJson.noSpaces.getBytes(StandardCharsets.UTF_8))
zos.closeEntry()
}
}
}
Sync[F].delay {
io.readOutputStream(blocker, chunkSize) { output =>
val zosF: F[ZipOutputStream] = Sync[F].delay(new ZipOutputStream(output))
for {
zos <- zosF
_ <- E.fromTry(writeToZos(series)(zos))
} yield ()
}
}
}
override def unzip(zipped: Stream[F, Byte]): F[Either[ParsingError, Vector[SeriesData]]] = {
unzipF[F](zipped, blocker).compile.toVector
.flatMap(_.sequence)
.map {
_.map(json =>
decode[SeriesData](json)
.fold(err => Left(ParsingError(err.getMessage + " Got string: " + json)), Right(_))
)
}
.map(_.sequence)
}
def unzipF[G[_]](zipped: Stream[G, Byte], bec: Blocker, chunkSize: Int = chunkSize)(implicit
F: ConcurrentEffect[G],
cs: ContextShift[G]
): Stream[G, G[String]] = {
zipped.through(unzipP(bec, chunkSize))
}
def unzipP[G[_]](
bec: Blocker,
chunkSize: Int = chunkSize
)(implicit F: ConcurrentEffect[G], cs: ContextShift[G]): Pipe[G, Byte, G[String]] = {
def unzipEntries(zis: ZipInputStream): Stream[G, G[String]] =
Stream.unfoldEval(zis) { zis0 =>
entry(zis0).map((_, zis0)).value
}
def entry(zis: ZipInputStream): OptionT[G, G[String]] =
OptionT(Sync[G].delay(Option(zis.getNextEntry))).map { x =>
val string = io
.readInputStream[G](F.delay(zis), chunkSize, bec, closeAfterUse = false)
.through(text.utf8Decode)
.compile
.string
string.flatMap { x =>
println("String:" + x)
F.delay(x)
}
}
val pipe: Stream[G, Byte] => Stream[G, G[String]] = { in =>
in.through(io.toInputStream).flatMap { is: InputStream =>
val zis: G[ZipInputStream] = Sync[G].delay(new ZipInputStream(is))
val zres: Stream[G, ZipInputStream] =
Stream.bracket(zis)(zis => Sync[G].delay(println("close state:" + zis.available())))
zres.flatMap { z =>
unzipEntries(z)
}
}
}
pipe
}
}
case class ParsingError(message: String) extends RuntimeException
class ZipSpec extends AnyFlatSpec with Matchers {
implicit val ec = Scheduler.global
implicit lazy val catsEffect: ConcurrentEffect[Task] =
new CatsConcurrentEffectForTask()(Scheduler.global, Task.defaultOptions)
implicit val logs: Logs[Task, Task] = Logs.sync[Task, Task]
it should "zip and unzip series" in {
val series1 = MDSSeriesData(SID.nextId(),
"test_series",
Granularity.Hour,
Vector[MDSPoint](MDSPoint(Instant.now, Vector[Bucket](Bucket("some_bucket", 2.0))))
)
val series = Vector(series1, series1.copy(SID.nextId(), "test_series2"))
def init(): Resource[Task, Zipper[Task]] = {
for {
implicit0(log: Logging[Task]) <- Logs[Task, Task].byName("gw").resource
blocker: Blocker <- Blocker.apply
} yield Zipper.make(blocker)
}
init()
.use { zipper =>
for {
zip: fs2.Stream[Task, Byte] <- zipper.zip(series)
_ <- println("zipped successfully").pure[Task]
data <- zip.compile.toVector
_ <- println(data).pure[Task]
unzip <- zipper.unzip(fs2.Stream.emits(data))
_ <- println("unzipped successfully").pure[Task]
_ <- printRes(unzip).pure[Task]
} yield ()
}
.runSyncUnsafe()
def printRes(either: Either[ParsingError, Vector[SeriesData]]): Unit = {
either match {
case Left(error) => println(error.message)
case Right(vector) => println(vector)
}
()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment