import zio.* import zio.prelude.* import zio.prelude.fx.* object BaseSyntax: type Program[S, R, E, A] = ZPure[Nothing, S, S, R, E, A] type LoggedProgram[W, S, R, E, A] = ZPure[W, S, S, R, E, A] def pure[S, R, A](a: A): Program[S, R, Nothing, A] = ZPure.succeed(a) def unit[S, R]: Program[S, R, Nothing, Unit] = ZPure.unit def raiseError[S, R, E](t: => E): Program[S, R, E, Nothing] = ZPure.fail(t) def assertThat[S, R, E](cond: => Boolean, t: => E): Program[S, R, E, Unit] = if cond then unit else raiseError(t) def assertThatNot[S, R, E](cond: => Boolean, t: => E): Program[S, R, E, Unit] = assertThat(!cond, t) def extractOption[S, R, E, A](a: Option[A], t: => E): Program[S, R, E, A] = a match case Some(value) => pure(value) case None => raiseError(t) def get[S, R]: Program[S, R, Nothing, S] = ZPure.get[S] def set[S, R](s: S): Program[S, R, Nothing, Unit] = EState.set(s) def setWithError[S, R, E](s: S): Program[S, R, E, Unit] = EState.set(s) def update[S, R](f: S => S): Program[S, R, Nothing, Unit] = EState.update(f.apply) def inspect[S, R, A](f: S => A): Program[S, R, Nothing, A] = get.map(f(_)) def inquire[S, R: Tag, A](f: R => A): Program[S, R, Nothing, A] = ZPure.service[S, R].map(f(_)) def log[W, S, R, E](w: W): LoggedProgram[W, S, R, E, Unit] = ZPure.log(w) def set[W, S, R](previouslog: Chunk[W]): LoggedProgram[W, S, R, Nothing, Unit] = ZPure.forEach(previouslog)(log(_)).unit def restore[W, S, R](previouslog: Chunk[W], previousstate: S): LoggedProgram[W, S, R, Nothing, Unit] = set(previouslog) *> set(previousstate) package cqrs package domain package mastermanagement import zio.* import eventstore.* import Error.* import Error.ValidationError.* import Event.* import Fact.* import Transport.* final class Syntax private (transport: Transport): type Program[A] = BaseSyntax.Program[Instance, Any, Error, A] type LoggedProgram[A] = BaseSyntax.LoggedProgram[Value, Instance, Any, Error, A] val get = BaseSyntax.get[Instance, Any] val set = BaseSyntax.set[Instance, Any] val update = BaseSyntax.update[Instance, Any] def setWithError = BaseSyntax.setWithError[Instance, Any, Error] def inspect[A] = BaseSyntax.inspect[Instance, Any, A] import BaseSyntax.* def lift(event: Event)(using transition: Transition): LoggedProgram[Unit] = for previous <- inspect(_.master.latest) _ <- transition(event, previous + 1) m <- inspect(_.master) _ <- log(Value(m, event)) yield () private def snapshot(raws: RawValues, instance: Instance): ZIO[Any, Any, (Values, Instance)] = val withsnapshot = for _ <- set(instance) weight = raws.foldLeft(0)((s, r) => if r.eventCategory == Category.Created && r.aggregateLatest > 1 then s else s + r.eventData.length + MinSnapshotEventSize ) _ = println(s"weight $weight $DefaultSnapshotThreshold") _ <- if weight > DefaultSnapshotThreshold then for m <- inspect(_.master) currentindex <- inspect(_.index) _ <- lift(SnapshotTaken(m.snapshot(currentindex))) yield () else unit yield () withsnapshot.runAll(instance) match case (snapshot, Right(instance, _)) => ZIO.succeed(snapshot, instance) case (_, Left(cause)) => ZIO.fail(cause) extension (program: LoggedProgram[Unit]) def runFactsEither: (Values, Either[zio.Cause[Error], Instance]) = program.runAll(Instance.empty) match case (facts, Right(instance, _)) => (facts, Right(instance)) case (_, Left(cause)) => (Chunk.empty, Left(cause.toCause)) def runFacts(tags: Tags): ZIO[Any, Any, (RawValues, Instance)] = runFactsEither match case (facts, Right(instance)) => for raws <- facts.mapZIO(transport.toRawValue(_, tags)) (s, instance) <- snapshot(raws, instance) withsnapshot <- if s.size == 1 then transport.toRawValue(s(0), tags).flatMap(s => ZIO.succeed(raws :+ s)) else ZIO.succeed(raws) yield (withsnapshot, instance) case (_, Left(cause)) => ZIO.fail(cause) final given instanceTransition: Transition = new Transition final class Transition: def apply(event: Event, latest: Int): Program[Unit] = import Entity.EntityHolder for previous <- inspect(_.master.latest) _ <- event match case SnapshotTaken(_) => unit case Created(_) | _ => assertThat(latest - previous == 1, EventNotInSequence(s"expected ${previous + 1}, but found ${latest}}, event ${event}")) _ <- event match case Nested(Nested(Nested(nested))) => ... yield () ... end Syntax object Syntax: val layer = ZLayer.fromZIO(makeLayer) private def makeLayer = for transport <- ZIO.service[Transport] result = Syntax(transport) yield result // example usage of domain "mastermanagement" case class Person(lastname: String, firstname: String, age: Int) given Codec[Person] = deriveCodec[Person] case class Matrix( m1: Double, m2: Double, m3: Double, m4: Double, m5: Double, m6: Double, m7: Double, m8: Double, m9: Double, m10: Double, m11: Double, m12: Double ) given Codec[Matrix] = deriveCodec[Matrix] object Matrix: final val identity = Matrix(1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0) import Properties.* import Properties.given def test(uuid: Uuid): LoggedProgram[Unit] = val largebytes = util.Random.alphanumeric.take(200).toList.toString.nn val modify = for _ <- addPropertyMaster("name" -> StringElem("this is my name.")) _ <- addPropertyMaster("description" -> StringElem("this is my description.")) _ <- addPropertyVersion("creator" -> StringElem("Bob")) _ <- addPropertyRevision("size" -> IntElem(4711)) _ <- addPropertyIteration("pi" -> DoubleElem(3.14159)) _ <- addPropertyIteration("somebytes" -> ByteArrayElem("blablabla".getBytes.nn)) _ <- addPropertyIteration("largebytes" -> StringElem(largebytes)) _ <- addPropertyIteration("happy" -> BooleanElem(true)) _ <- addPropertyIteration("noidea" -> NullElem) _ <- addPropertyIteration("dontcare" -> UndefinedElem) _ <- addPropertyIteration("file" -> AnyElem(File(Uuid.randomUuid, 100, 10, "test.txt"))) _ <- addPropertyIteration("person" -> AnyElem(Person("Smith", "Joe", 37))) _ <- addPropertyIteration("matrix" -> AnyElem(Matrix.identity)) _ <- newIteration _ <- newVersion _ <- newRevision _ <- newIteration _ <- newVersion _ <- newIteration _ <- removePropertyIteration("pi") _ <- addPropertyIteration("e" -> DoubleElem(2.71828)) _ <- removePropertyMaster("name") _ <- deleteIteration _ <- selectIndex((2, 1, 1)) _ <- removePropertyIteration("name") _ <- removePropertyIteration("name") _ <- newIteration _ <- addPropertyIteration("name" -> StringElem("this is my new name.")) _ <- newVersion _ <- selectIndex((1, 1, 2)) _ <- selectLatest yield () for _ <- createMaster(uuid) _ <- modify.repeatN(1999) yield () // using Transition to recreate the AggregateRoot import syntax.* def readById(uuid: Uuid) = ZIO.scoped(eventstore.readFactsByAggregateRootId(uuid).flatMap(aggregate(_))) def aggregate( facts: RawValueStream )(using transition: Transition): ZIO[Any, StreamingError, ZPure[Nothing, Instance, Instance, Any, Error, Unit]] = facts .mapZIO(transport.fromRawValue(_)) .runFold(setWithError(Instance.empty))((s, value) => s.flatMap(_ => transition(value.event, value.identified.latest)))