package eventsourcing import java.time.Instant import cats._ import cats.data.Coproduct import cats.free.{Free, Inject} import cats.implicits._ import doobie.imports._ import fs2.Stream import io.circe.{Decoder, Encoder, Json} trait FreeOp[F[_], A] { this: F[A] => def liftF: Free[F, A] = Free.liftF(this) def inject[G[_]](implicit I: Inject[F, G]): Free[G, A] = Free.inject(this) } object Queries { final case class DBEvent(id: Int, created: Instant, payload: Json) private[pg] def appendQ[A](event: A)(implicit enc: Encoder[A]) = sql"insert into event(payload) values (${enc(event)})" def append[A: Encoder](event: A): ConnectionIO[DBEvent] = appendQ(event).update.run private[pg] val streamAllQ: Query0[DBEvent] = sql"select id, created, payload from event".query[DBEvent] val streamAll: Stream[ConnectionIO, DBEvent] = streamAllQ.process } trait EventSourcing { import Queries._ // Abstract Types type Event type Algebra[_] // Abstract Fields implicit def encoder: Encoder[Event] implicit def decoder: Decoder[Event] def eventToAlgebra(e: Event): FreeOp[Algebra, _] // EventLogOp algebra for event logging sealed trait EventLogOp[A] extends FreeOp[EventLogOp, A] with Product with Serializable final case class Append(event: Event) extends EventLogOp[Unit] type F[A] = Algebra[A] type EventLogFree[A] = Free[EventLogOp, A] type C[A] = Coproduct[F, EventLogOp, A] type FreeC[A] = Free[C, A] object interpreters { // The EventLog to ConnectionIO interpreter val eventLogOp2ConIO = new (EventLogOp ~> ConnectionIO) { override def apply[A](fa: EventLogOp[A]): ConnectionIO[A] = fa match { case Append(e) => append(e).run.map(_ => ()) } } // The ConnectionIO to a Monad `M` def conIO2M[M[_]](transactor: Transactor[M]): ConnectionIO ~> M = new (ConnectionIO ~> M) { override def apply[A](fa: ConnectionIO[A]): M[A] = transactor.trans(fa) } // `EventLogOp` to a Monad `M` def eventLog2M[M[_]](transactor: Transactor[M]): EventLogOp ~> M = eventLogOp2ConIO.andThen(conIO2M(transactor)) // The interpreter that injects into the coproduct and adds an `Append` instruction private val F2FreeC = new (F ~> FreeC) { override def apply[A](fa: F[A]): FreeC[A] = { fa match { case e: Event => for { _ <- Append(e).inject[C] f <- Free.inject[F, C](fa) } yield f } } } // Any Free algebra to to a Monad `M` with event logging def F2M[M[_]](f2M: F ~> M, el2M: EventLogOp ~> M)(implicit M: Monad[M]): F ~> M = new (F ~> M) { override def apply[A](fa: F[A]): M[A] = { val v = F2FreeC(fa) v.foldMap(f2M or el2M) } } } def playback[M[_]](transactor: Transactor[M], chunkSize: Int)(f2M: F ~> M)( implicit M: Monad[M]): Stream[M, M[Unit]] = { val conIOStream: Stream[ConnectionIO, Free[Algebra, Unit]] = streamAll // convert the Json into an event .map { dbEvent => decoder.decodeJson(dbEvent.payload).toOption } // discard events that fail to decode .collect { case Some(x) => x } // convert the events into operations of type Unit .map { event => eventToAlgebra(event).liftF.map(_ => ()) } val batchedStream: Stream[ConnectionIO, Free[F, List[Unit]]] = conIOStream .chunkLimit(chunkSize) // sequence the chunk to convert to a single Free instance .map(_.toList.sequence[Free[F, ?], Unit]) // transform stream from ConnectionIOs to Ms val fStreamM: Stream[M, Free[F, List[Unit]]] = transactor.transP(batchedStream) // interpret the stream of `Free[F, ?]`s to create a stream of `M`s val mStreamM: Stream[M, M[List[Unit]]] = fStreamM.map(_.foldMap(f2M)) // simplify the return type to `Unit` mStreamM.map(_.map(_ => ())) } }