Last active
May 9, 2022 14:55
-
-
Save codeliner/14a8d98d53efafdd35e851d76e89cc94 to your computer and use it in GitHub Desktop.
prooph MongoEventStore v7
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?php | |
| declare(strict_types = 1); | |
| namespace Acme\Infrastructure\MongoDb; | |
| use Iterator; | |
| use MongoDB\Collection; | |
| use MongoDB\Driver\Exception\BulkWriteException; | |
| use MongoDB\Operation\FindOneAndUpdate; | |
| use Prooph\Common\Messaging\Message; | |
| use Prooph\Common\Messaging\MessageConverter; | |
| use Prooph\Common\Messaging\MessageFactory; | |
| use Prooph\EventStore\EventStore; | |
| use Prooph\EventStore\Exception\StreamNotFound; | |
| use Prooph\EventStore\Metadata\MetadataMatcher; | |
| use Prooph\EventStore\Metadata\Operator; | |
| use Prooph\EventStore\Stream; | |
| use Prooph\EventStore\StreamName; | |
| final class MongoEventStore implements EventStore | |
| { | |
| const STREAM_COLLECTION = 'streams'; | |
| /** | |
| * @var MongoConnection | |
| */ | |
| private $mongoConnection; | |
| /** | |
| * @var MessageFactory | |
| */ | |
| private $messageFactory; | |
| /** | |
| * @var MessageConverter | |
| */ | |
| private $messageConverter; | |
| /** | |
| * @var array | |
| */ | |
| private $aggregateStreamNames; | |
| public function __construct(MongoConnection $mongoConnection, MessageFactory $messageFactory, MessageConverter $messageConverter, array $aggregateStreamNames) | |
| { | |
| $this->mongoConnection = $mongoConnection; | |
| $this->messageFactory = $messageFactory; | |
| $this->messageConverter = $messageConverter; | |
| $this->aggregateStreamNames = $aggregateStreamNames; | |
| } | |
| public function fetchStreamMetadata(StreamName $streamName): array | |
| { | |
| $doc = $this->mongoConnection->selectCollection(self::STREAM_COLLECTION) | |
| ->findOne(['_id' => $streamName->toString()]); | |
| if (!$doc) { | |
| throw StreamNotFound::with($streamName); | |
| } | |
| return $doc['metadata']; | |
| } | |
| public function hasStream(StreamName $streamName): bool | |
| { | |
| return (bool)$this->mongoConnection->selectCollection(self::STREAM_COLLECTION) | |
| ->count(['_id' => $streamName->toString()]); | |
| } | |
| public function create(Stream $stream): void | |
| { | |
| if (iterator_count($stream->streamEvents()) > 0) { | |
| throw new \RuntimeException(__CLASS__ . ' does not support creating a stream and appending events in one operation.'); | |
| } | |
| $streamDoc = ['_id' => $stream->streamName()->toString(), 'metadata' => $stream->metadata(), 'seq' => 0]; | |
| $this->mongoConnection->selectCollection(self::STREAM_COLLECTION)->insertOne($streamDoc); | |
| } | |
| public function appendTo(StreamName $streamName, Iterator $streamEvents): void | |
| { | |
| if (iterator_count($streamEvents) > 1) { | |
| throw new \RuntimeException('Due to limited ACID support you can only append one event per operation to the event stream: ' . $streamName->toString()); | |
| } | |
| foreach($streamEvents as $event) { | |
| $this->insertInto($streamName, $this->prepareEventData($event)); | |
| } | |
| } | |
| public function load( | |
| StreamName $streamName, | |
| int $fromNumber = 1, | |
| int $count = null, | |
| MetadataMatcher $metadataMatcher = null | |
| ): Stream | |
| { | |
| $collection = $this->getCollectionByStreamName($streamName); | |
| if (null === $metadataMatcher) { | |
| $metadataMatcher = new MetadataMatcher(); | |
| } | |
| $query = $this->buildQuery($metadataMatcher); | |
| $query['no'] = ['$gte' => $fromNumber]; | |
| $options = [ | |
| 'sort' => ['no' => 1] | |
| ]; | |
| if ($count) { | |
| $options['limit'] = $count; | |
| } | |
| $cursor = $collection->find($query, $options); | |
| $iterator = $this->mapIterator($cursor, function (array $event) { | |
| return $this->eventDataToMessage($event); | |
| }); | |
| return new Stream($streamName, $iterator); | |
| } | |
| public function loadReverse( | |
| StreamName $streamName, | |
| int $fromNumber = PHP_INT_MAX, | |
| int $count = null, | |
| MetadataMatcher $metadataMatcher = null | |
| ): Stream | |
| { | |
| $collection = $this->getCollectionByStreamName($streamName); | |
| if (null === $metadataMatcher) { | |
| $metadataMatcher = new MetadataMatcher(); | |
| } | |
| $query = $this->buildQuery($metadataMatcher); | |
| $query['no'] = ['$lte' => $fromNumber]; | |
| $options = [ | |
| 'sort' => ['no' => -1] | |
| ]; | |
| if ($count) { | |
| $options['limit'] = $count; | |
| } | |
| $cursor = $collection->find($query, $options); | |
| $iterator = $this->mapIterator($cursor, function (array $event) { | |
| return $this->eventDataToMessage($event); | |
| }); | |
| return new Stream($streamName, $iterator); | |
| } | |
| public function delete(StreamName $streamName): void | |
| { | |
| //Note: this is not transaction save. | |
| //However, delete should only be called for projection streams and mongodb will recreate an empty | |
| //stream collection if it not exists. So self::hasStream can return true even if there is no stream collection | |
| //but only the ref in the streams collection (scenario if first cmd succeed but second fails) | |
| $this->mongoConnection->selectCollection($streamName->toString()) | |
| ->drop(); | |
| $this->mongoConnection->selectCollection(self::STREAM_COLLECTION) | |
| ->deleteOne(['_id' => $streamName->toString()]); | |
| } | |
| /** | |
| * @param Message $e | |
| * @return array | |
| */ | |
| private function prepareEventData(Message $e) | |
| { | |
| $eventArr = $this->messageConverter->convertToArray($e); | |
| $eventData = [ | |
| '_id' => $eventArr['uuid'], | |
| 'event_name' => $eventArr['message_name'], | |
| 'payload' => $eventArr['payload'], | |
| 'created_at' => $eventArr['created_at']->format('Y-m-d\TH:i:s.u'), | |
| 'metadata' => $eventArr['metadata'] | |
| ]; | |
| return $eventData; | |
| } | |
| private function eventDataToMessage(array $eventData): Message | |
| { | |
| $createdAt = \DateTimeImmutable::createFromFormat( | |
| 'Y-m-d\TH:i:s.u', | |
| $eventData['created_at'], | |
| new \DateTimeZone('UTC') | |
| ); | |
| return $this->messageFactory->createMessageFromArray($eventData['event_name'], [ | |
| 'uuid' => $eventData['_id'], | |
| 'created_at' => $createdAt, | |
| 'payload' => $eventData['payload'], | |
| 'metadata' => $eventData['metadata'] | |
| ]); | |
| } | |
| private function getCollectionByStreamName(StreamName $streamName): Collection | |
| { | |
| $streamName = $streamName->toString(); | |
| $collection = $this->mongoConnection->selectCollection($streamName); | |
| $collection->createIndex([ | |
| 'no' => 1 | |
| ], ['unique' => true, 'name' => 'no_idx']); | |
| if (in_array($streamName, $this->aggregateStreamNames)) { | |
| $collection->createIndex([ | |
| 'metadata._aggregate_id' => 1, | |
| 'metadata._aggregate_version' => 1 | |
| ], ['unique' => true, 'name' => 'aggregate_version_idx']); | |
| } | |
| return $collection; | |
| } | |
| private function buildQuery(MetadataMatcher $matcher): array | |
| { | |
| $query = []; | |
| foreach ($matcher->data() as $match) { | |
| $field = $match['field']; | |
| $operator = $match['operator']->getValue(); | |
| $value = $match['value']; | |
| switch ($operator) { | |
| case Operator::EQUALS: | |
| $operator = '$eq'; | |
| break; | |
| case Operator::GREATER_THAN: | |
| $operator = '$gt'; | |
| break; | |
| case Operator::GREATER_THAN_EQUALS: | |
| $operator = '$gte'; | |
| break; | |
| case Operator::LOWER_THAN: | |
| $operator = '$lt'; | |
| break; | |
| case Operator::LOWER_THAN_EQUALS: | |
| $operator = '$lte'; | |
| break; | |
| case Operator::NOT_EQUALS: | |
| $operator = '$ne'; | |
| break; | |
| } | |
| $query[$field] = [$operator => $value]; | |
| } | |
| return $query; | |
| } | |
| private function insertInto(StreamName $streamName, array $eventData): void | |
| { | |
| $streamInfo = $this->mongoConnection->selectCollection(self::STREAM_COLLECTION) | |
| ->findOneAndUpdate( | |
| ['_id' => $streamName->toString()], | |
| ['$inc' => ['seq' => 1]], | |
| ['returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER] | |
| ); | |
| $eventData['no'] = $streamInfo['seq']; | |
| try { | |
| $this->getCollectionByStreamName($streamName)->insertOne($eventData); | |
| } catch (BulkWriteException $e) { | |
| $this->mongoConnection->selectCollection(self::STREAM_COLLECTION)->updateOne( | |
| ['_id' => $streamName->toString()], | |
| ['$inc' => ['seq' => -1]] | |
| ); | |
| throw $e; | |
| } | |
| } | |
| private function mapIterator(iterable $iterable, callable $callback): \IteratorIterator | |
| { | |
| return new class($iterable, $callback) extends \IteratorIterator { | |
| /** | |
| * The function to be apply on all InnerIterator element | |
| * | |
| * @var callable | |
| */ | |
| private $callable; | |
| /** | |
| * The Constructor | |
| * | |
| * @param iterable $iterator | |
| * @param callable $callable | |
| */ | |
| public function __construct(iterable $iterable, callable $callable) | |
| { | |
| if (is_array($iterable)) { | |
| $iterable = new \ArrayIterator($iterable); | |
| } | |
| parent::__construct($iterable); | |
| $this->callable = $callable; | |
| } | |
| /** | |
| * Get the value of the current element | |
| */ | |
| public function current() | |
| { | |
| $iterator = $this->getInnerIterator(); | |
| $callback = $this->callable; | |
| return $callback(parent::current(), parent::key(), $iterator); | |
| } | |
| }; | |
| } | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Note: This is an example of a MongoDb event store implementing the Prooph\EventStore v7 basic interface.
Due t limited ACID support we need to use some "dirty" hacks and accept some limitations.
Limitations:
Dirty:
MongoEventStore::insertIntomaintains a sequence per stream but is not transaction-safeMongoEventStore::deleteis not transaction-safeWhen and why should you use the MongoEventStore