Skip to content

Instantly share code, notes, and snippets.

@codeliner
Last active May 9, 2022 14:55
Show Gist options
  • Select an option

  • Save codeliner/14a8d98d53efafdd35e851d76e89cc94 to your computer and use it in GitHub Desktop.

Select an option

Save codeliner/14a8d98d53efafdd35e851d76e89cc94 to your computer and use it in GitHub Desktop.
prooph MongoEventStore v7
<?php
declare(strict_types = 1);
namespace Acme\Infrastructure\MongoDb;
use Iterator;
use MongoDB\Collection;
use MongoDB\Driver\Exception\BulkWriteException;
use MongoDB\Operation\FindOneAndUpdate;
use Prooph\Common\Messaging\Message;
use Prooph\Common\Messaging\MessageConverter;
use Prooph\Common\Messaging\MessageFactory;
use Prooph\EventStore\EventStore;
use Prooph\EventStore\Exception\StreamNotFound;
use Prooph\EventStore\Metadata\MetadataMatcher;
use Prooph\EventStore\Metadata\Operator;
use Prooph\EventStore\Stream;
use Prooph\EventStore\StreamName;
final class MongoEventStore implements EventStore
{
const STREAM_COLLECTION = 'streams';
/**
* @var MongoConnection
*/
private $mongoConnection;
/**
* @var MessageFactory
*/
private $messageFactory;
/**
* @var MessageConverter
*/
private $messageConverter;
/**
* @var array
*/
private $aggregateStreamNames;
public function __construct(MongoConnection $mongoConnection, MessageFactory $messageFactory, MessageConverter $messageConverter, array $aggregateStreamNames)
{
$this->mongoConnection = $mongoConnection;
$this->messageFactory = $messageFactory;
$this->messageConverter = $messageConverter;
$this->aggregateStreamNames = $aggregateStreamNames;
}
public function fetchStreamMetadata(StreamName $streamName): array
{
$doc = $this->mongoConnection->selectCollection(self::STREAM_COLLECTION)
->findOne(['_id' => $streamName->toString()]);
if (!$doc) {
throw StreamNotFound::with($streamName);
}
return $doc['metadata'];
}
public function hasStream(StreamName $streamName): bool
{
return (bool)$this->mongoConnection->selectCollection(self::STREAM_COLLECTION)
->count(['_id' => $streamName->toString()]);
}
public function create(Stream $stream): void
{
if (iterator_count($stream->streamEvents()) > 0) {
throw new \RuntimeException(__CLASS__ . ' does not support creating a stream and appending events in one operation.');
}
$streamDoc = ['_id' => $stream->streamName()->toString(), 'metadata' => $stream->metadata(), 'seq' => 0];
$this->mongoConnection->selectCollection(self::STREAM_COLLECTION)->insertOne($streamDoc);
}
public function appendTo(StreamName $streamName, Iterator $streamEvents): void
{
if (iterator_count($streamEvents) > 1) {
throw new \RuntimeException('Due to limited ACID support you can only append one event per operation to the event stream: ' . $streamName->toString());
}
foreach($streamEvents as $event) {
$this->insertInto($streamName, $this->prepareEventData($event));
}
}
public function load(
StreamName $streamName,
int $fromNumber = 1,
int $count = null,
MetadataMatcher $metadataMatcher = null
): Stream
{
$collection = $this->getCollectionByStreamName($streamName);
if (null === $metadataMatcher) {
$metadataMatcher = new MetadataMatcher();
}
$query = $this->buildQuery($metadataMatcher);
$query['no'] = ['$gte' => $fromNumber];
$options = [
'sort' => ['no' => 1]
];
if ($count) {
$options['limit'] = $count;
}
$cursor = $collection->find($query, $options);
$iterator = $this->mapIterator($cursor, function (array $event) {
return $this->eventDataToMessage($event);
});
return new Stream($streamName, $iterator);
}
public function loadReverse(
StreamName $streamName,
int $fromNumber = PHP_INT_MAX,
int $count = null,
MetadataMatcher $metadataMatcher = null
): Stream
{
$collection = $this->getCollectionByStreamName($streamName);
if (null === $metadataMatcher) {
$metadataMatcher = new MetadataMatcher();
}
$query = $this->buildQuery($metadataMatcher);
$query['no'] = ['$lte' => $fromNumber];
$options = [
'sort' => ['no' => -1]
];
if ($count) {
$options['limit'] = $count;
}
$cursor = $collection->find($query, $options);
$iterator = $this->mapIterator($cursor, function (array $event) {
return $this->eventDataToMessage($event);
});
return new Stream($streamName, $iterator);
}
public function delete(StreamName $streamName): void
{
//Note: this is not transaction save.
//However, delete should only be called for projection streams and mongodb will recreate an empty
//stream collection if it not exists. So self::hasStream can return true even if there is no stream collection
//but only the ref in the streams collection (scenario if first cmd succeed but second fails)
$this->mongoConnection->selectCollection($streamName->toString())
->drop();
$this->mongoConnection->selectCollection(self::STREAM_COLLECTION)
->deleteOne(['_id' => $streamName->toString()]);
}
/**
* @param Message $e
* @return array
*/
private function prepareEventData(Message $e)
{
$eventArr = $this->messageConverter->convertToArray($e);
$eventData = [
'_id' => $eventArr['uuid'],
'event_name' => $eventArr['message_name'],
'payload' => $eventArr['payload'],
'created_at' => $eventArr['created_at']->format('Y-m-d\TH:i:s.u'),
'metadata' => $eventArr['metadata']
];
return $eventData;
}
private function eventDataToMessage(array $eventData): Message
{
$createdAt = \DateTimeImmutable::createFromFormat(
'Y-m-d\TH:i:s.u',
$eventData['created_at'],
new \DateTimeZone('UTC')
);
return $this->messageFactory->createMessageFromArray($eventData['event_name'], [
'uuid' => $eventData['_id'],
'created_at' => $createdAt,
'payload' => $eventData['payload'],
'metadata' => $eventData['metadata']
]);
}
private function getCollectionByStreamName(StreamName $streamName): Collection
{
$streamName = $streamName->toString();
$collection = $this->mongoConnection->selectCollection($streamName);
$collection->createIndex([
'no' => 1
], ['unique' => true, 'name' => 'no_idx']);
if (in_array($streamName, $this->aggregateStreamNames)) {
$collection->createIndex([
'metadata._aggregate_id' => 1,
'metadata._aggregate_version' => 1
], ['unique' => true, 'name' => 'aggregate_version_idx']);
}
return $collection;
}
private function buildQuery(MetadataMatcher $matcher): array
{
$query = [];
foreach ($matcher->data() as $match) {
$field = $match['field'];
$operator = $match['operator']->getValue();
$value = $match['value'];
switch ($operator) {
case Operator::EQUALS:
$operator = '$eq';
break;
case Operator::GREATER_THAN:
$operator = '$gt';
break;
case Operator::GREATER_THAN_EQUALS:
$operator = '$gte';
break;
case Operator::LOWER_THAN:
$operator = '$lt';
break;
case Operator::LOWER_THAN_EQUALS:
$operator = '$lte';
break;
case Operator::NOT_EQUALS:
$operator = '$ne';
break;
}
$query[$field] = [$operator => $value];
}
return $query;
}
private function insertInto(StreamName $streamName, array $eventData): void
{
$streamInfo = $this->mongoConnection->selectCollection(self::STREAM_COLLECTION)
->findOneAndUpdate(
['_id' => $streamName->toString()],
['$inc' => ['seq' => 1]],
['returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER]
);
$eventData['no'] = $streamInfo['seq'];
try {
$this->getCollectionByStreamName($streamName)->insertOne($eventData);
} catch (BulkWriteException $e) {
$this->mongoConnection->selectCollection(self::STREAM_COLLECTION)->updateOne(
['_id' => $streamName->toString()],
['$inc' => ['seq' => -1]]
);
throw $e;
}
}
private function mapIterator(iterable $iterable, callable $callback): \IteratorIterator
{
return new class($iterable, $callback) extends \IteratorIterator {
/**
* The function to be apply on all InnerIterator element
*
* @var callable
*/
private $callable;
/**
* The Constructor
*
* @param iterable $iterator
* @param callable $callable
*/
public function __construct(iterable $iterable, callable $callable)
{
if (is_array($iterable)) {
$iterable = new \ArrayIterator($iterable);
}
parent::__construct($iterable);
$this->callable = $callable;
}
/**
* Get the value of the current element
*/
public function current()
{
$iterator = $this->getInnerIterator();
$callback = $this->callable;
return $callback(parent::current(), parent::key(), $iterator);
}
};
}
}
@codeliner
Copy link
Author

Note: This is an example of a MongoDb event store implementing the Prooph\EventStore v7 basic interface.
Due t limited ACID support we need to use some "dirty" hacks and accept some limitations.

Limitations:

  • You cannot create a new stream and append events to it in one operation.
  • You cannot append more than one event to a stream in one operation.

Dirty:

  • MongoEventStore::insertInto maintains a sequence per stream but is not transaction-safe
  • MongoEventStore::delete is not transaction-safe

When and why should you use the MongoEventStore

  • If you <3 MongoDb for projections, want to use a single database for your system and you can live with the tradeoffs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment