Migrate to [email protected]
-
Add the following packages to
pyproject.toml:minos-microservice-transactions
-
Update the
config.ymlfile as follows:- Replace
minos.aggregate.DatabaseTransactionRepositorybyminos.transactions.DatabaseTransactionRepository. - Replace
minos.aggregate.DatabaseEventRepositorybyminos.aggregate.DatabaseDeltaRepository. - Remove
minos.aggregate.TransactionServiceminos.aggregate.TransactionServicefrom theservicessection. - Add the following fields to the
aggregatesection as follows (replacingMyAggregatewith the corresponding class name):
- Replace
...
aggregate:
client: src.aggregates.MyAggregate
publisher:
client: minos.networks.TransactionalBrokerPublisher
repository: minos.networks.DatabaseBrokerPublisherTransactionRepository
...
...- Update source code as follows:
- Replace
minos.aggregate.Eventasminos.aggregate.Delta. - Replace
minos.aggregate.RootEntitybyminos.aggregate.Entity. - Remove
minos.aggregate.ExternalEntityand use pass the class name asstrto theminos.aggregate.Reftype hint. Here is an example:
- Replace
# Old Approach
class Review(RootEntity):
product: Ref[Product]
class Product(ExternalEntity):
pass# New Approach
class Review(Entity):
product: Ref["src.aggregates.Product"]- Stop using
MyEntity.get,MyEntity.find,MyEntity.save,MyEntity.create,MyEntity.update,MyEntity.delete, etc. methods and replace them byEntityRepository.get,EntityRepository.find,EntityRepository.save,EntityRepository.create,EntityRepository.update,EntityRepository.delete, etc. (Note thatEntityRepositoryis accessible as therepositoryattribute of theMyAggregateclass. TheMyAggregateclass is accesible as theaggregateattribute of theMyCommandServiceandMyQueryServiceclasses. Here is an example:
# Old approach
class MyCommandService:
async def handle(self, request: Request) -> Response:
obj = await MyEntity.create(foo="bar")
return Response(obj)# New approach
class MyCommandService:
async def handle(self, request: Request) -> Response:
obj = await self.aggregate.create_one(foo="bar")
return Response(obj)
class MyAggregate(Aggregate[MyEntity]):
async def create(self, foo: str) -> MyEntity:
obj, delta = await self.repository.create(MyEntity, foo=foo)
await self.publish_domain_event(delta)
return obj- Store sub-
Entityinstances independenty. Here is an example:
# Old Approach
class MyEntity(RootEntity):
entries: EntitySet[MySubEntity]
class MySubEntity(Entity):
number: int
class MyAggregate(Aggregate[Entity]):
async def add_sub_entity(obj: MyEntity, entry: MySubEntity):
obj.entries.add(entry)
await obj.save()# New Approach
class MyEntity(Entity):
entries: EntitySet[Ref[MySubEntity]]
class MySubEntity(Entity):
number: int
class MyAggregate(Aggregate[Entity]):
async def add_sub_entity(obj: MyEntity, entry: MySubEntity):
delta = self.repository.save(entry)
await self.publish_domain_event(delta)
obj.entries.add(entry)
delta = await self.repository.save(obj)
await self.publish_domain_event(delta)
- As methods to directly interact with the database are not directly accesible from
MyEntityanymore, in case of the saga methods, it's necessary to call theMyAggregateclass to access theRepositoryEntityand efectively read or store entities. To do that, use the Dependency Injection system supported by theminos.common.Injectdecorator. Here is an example:
# Old approach
async def commit_callback(context: SagaContext) -> SagaContext:
order = await MyEntity.create(foo=context["bar"])
return SagaContext(order=order)# New approach
@Inject()
async def commit_callback(context: SagaContext, aggregate: MyAggregate) -> SagaContext:
order = await aggregate.create(foo=context["bar"])
return SagaContext(order=order)