eventStore = $eventStore; $this->name = $name; $this->readModel = $readModel; $this->mongoConnection = $mongoConnection; $this->cacheSize = $cacheSize; $this->persistBlockSize = $persistBlockSize; } /** * The callback has to return an array */ public function init(Closure $callback): ReadModelProjection { if (null !== $this->initCallback) { throw new \RuntimeException('Projection already initialized'); } $callback = Closure::bind($callback, $this->createHandlerContext($this->currentStreamName)); $result = $callback(); if (is_array($result)) { $this->state = $result; } $this->initCallback = $callback; return $this; } public function fromStream(string $streamName): ReadModelProjection { if (null !== $this->streamPositions) { throw new \RuntimeException('From was already called'); } $this->streamPositions = [$streamName => 0]; return $this; } public function fromStreams(string ...$streamNames): ReadModelProjection { if (null !== $this->streamPositions) { throw new \RuntimeException('From was already called'); } foreach ($streamNames as $streamName) { $this->streamPositions[$streamName] = 0; } return $this; } public function when(array $handlers): ReadModelProjection { if (null !== $this->handler || ! empty($this->handlers)) { throw new \RuntimeException('When was already called'); } foreach ($handlers as $eventName => $handler) { if (! is_string($eventName)) { throw new \InvalidArgumentException('Invalid event name given, string expected'); } if (! $handler instanceof Closure) { throw new \InvalidArgumentException('Invalid handler given, Closure expected'); } $this->handlers[$eventName] = Closure::bind($handler, $this->createHandlerContext($this->currentStreamName)); } return $this; } public function whenAny(Closure $handler): ReadModelProjection { if (null !== $this->handler || ! empty($this->handlers)) { throw new \RuntimeException('When was already called'); } $this->handler = Closure::bind($handler, $this->createHandlerContext($this->currentStreamName)); return $this; } public function stop(): void { $this->isStopped = true; } public function getState(): array { return $this->state; } public function getName(): string { return $this->name; } public function delete(bool $deleteProjection): void { if($deleteProjection) { $this->readModel->delete(); } } public function readModel(): ReadModel { return $this->readModel; } public function run(bool $keepRunning = true, ?int $usleep = 100): void { $this->createProjectionIfNotExist(); $this->acquireLock(); try { do { $this->load(); $singleHandler = null !== $this->handler; foreach ($this->streamPositions as $streamName => $position) { try { $stream = $this->eventStore->load(new StreamName($streamName), $position + 1); } catch (StreamNotFound $e) { // no newer events found continue; } if ($singleHandler) { $this->handleStreamWithSingleHandler($streamName, $stream->streamEvents()); } else { $this->handleStreamWithHandlers($streamName, $stream->streamEvents()); } if ($this->isStopped) { break; } } if (0 === $this->eventCounter) { if (null !== $usleep) { usleep($usleep); } } else { $this->persist(); } $this->eventCounter = 0; } while ($keepRunning && ! $this->isStopped); } finally { $this->releaseLock(); } } protected function createProjectionIfNotExist(): void { $col = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION); try { $col->insertOne([ '_id' => $this->name, 'position' => [], 'state' => [], 'locked_until' => null ]); } catch (\Throwable $ex) { //ignore errors especially duplicate key errors } } protected function load(): void { $col = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION); $doc = $col->findOne(['_id' => $this->name]); if (!$doc) { throw new \RuntimeException('Projection information for ' . $this->name . ' missing in collection ' . self::PROJECTIONS_COLLECTION); } $this->streamPositions = $doc['position']; $state = $doc['state']; if (! empty($state)) { $this->state = $state; } } protected function persist(): void { $this->readModel()->persist(); $now = new \DateTimeImmutable('now', new \DateTimeZone('UTC')); $lockUntilString = $now->modify('+' . self::LOCK_TIMEOUT_MS . ' ms')->format('Y-m-d\TH:i:s.u'); $col = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION); $col->updateOne([ '_id' => $this->name ], [ '$set' => [ 'position' => $this->streamPositions, 'state' => $this->state, 'locked_until' => $lockUntilString ] ]); } public function reset(): void { $this->createProjectionIfNotExist(); if (null !== $this->streamPositions) { $this->streamPositions = array_map( function (): int { return 0; }, $this->streamPositions ); } $this->isStopped = false; $callback = $this->initCallback; if (is_callable($callback)) { $result = $callback(); if (is_array($result)) { $this->state = $result; } } else { $this->state = []; } $this->eventStore->delete(new StreamName($this->name)); $this->readModel->reset(); $this->acquireLock(); $this->persist(); $this->releaseLock(); } /** * @throws RuntimeException */ protected function acquireLock(): void { $now = new \DateTimeImmutable('now', new \DateTimeZone('UTC')); $nowString = $now->format('Y-m-d\TH:i:s.u'); $lockUntilString = $now->modify('+' . self::LOCK_TIMEOUT_MS . ' ms')->format('Y-m-d\TH:i:s.u'); $result = $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION)->updateOne([ '_id' => $this->name, '$or' => [ [ 'locked_until' => null, ], [ 'locked_until' => ['$lt' => $nowString] ] ] ], [ '$set' => [ 'locked_until' => $lockUntilString ] ]); if ($result->getMatchedCount() !== 1) { throw new RuntimeException('Another projection process is already running'); } } protected function releaseLock(): void { $this->mongoConnection->selectCollection(self::PROJECTIONS_COLLECTION)->updateOne([ '_id' => $this->name, ], [ '$set' => [ 'locked_until' => null ] ]); } public function fromCategory(string $name): ReadModelProjection { throw new BadMethodCallException(__METHOD__ . ' not supported'); } public function fromCategories(string ...$names): ReadModelProjection { throw new BadMethodCallException(__METHOD__ . ' not supported'); } public function fromAll(): ReadModelProjection { throw new BadMethodCallException(__METHOD__ . ' not supported'); } private function handleStreamWithSingleHandler(string $streamName, \Iterator $events): void { $this->currentStreamName = $streamName; $handler = $this->handler; foreach ($events as $event) { /* @var Message $event */ $this->streamPositions[$streamName]++; $this->eventCounter++; $result = $handler($this->state, $event); if (is_array($result)) { $this->state = $result; } if ($this->eventCounter === $this->persistBlockSize) { $this->persist(); $this->eventCounter = 0; } if ($this->isStopped) { break; } } } private function handleStreamWithHandlers(string $streamName, \Iterator $events): void { $this->currentStreamName = $streamName; foreach ($events as $event) { /* @var Message $event */ $this->streamPositions[$streamName]++; $this->eventCounter++; if (! isset($this->handlers[$event->messageName()])) { continue; } $handler = $this->handlers[$event->messageName()]; $result = $handler($this->state, $event); if (is_array($result)) { $this->state = $result; } if ($this->eventCounter === $this->persistBlockSize) { $this->persist(); $this->eventCounter = 0; } if ($this->isStopped) { break; } } } private function createHandlerContext(?string &$streamName) { return new class($this, $streamName) { /** * @var ReadModelProjection */ private $projection; /** * @var ?string */ private $streamName; public function __construct(ReadModelProjection $projection, ?string &$streamName) { $this->projection = $projection; $this->streamName = &$streamName; } public function stop(): void { $this->projection->stop(); } public function readModel(): ReadModel { return $this->projection->readModel(); } public function streamName(): ?string { return $this->streamName; } }; } }