query = $query; $this->connection = $query->getEntityManager()->getConnection(); $this->cursorName = uniqid('cursor_'); assert($this->connection->getDriver() instanceof PDOPgSqlDriver); } public function __destruct() { if ($this->isOpen) { $this->close(); } } public function getFetchQuery(int $count = 1, string $direction = self::DIRECTION_FORWARD): NativeQuery { if (!$this->isOpen) { $this->openCursor(); } $query = clone $this->query; $query->setParameters([]); if ( $direction == self::DIRECTION_ABSOLUTE || $direction == self::DIRECTION_RELATIVE || $direction == self::DIRECTION_FORWARD || $direction == self::DIRECTION_BACKWARD ) { $query->setSQL(sprintf( 'FETCH %s %d FROM %s', $direction, $count, $this->connection->quoteIdentifier($this->cursorName) )); } else { $query->setSQL(sprintf( 'FETCH %s FROM %s', $direction, $this->connection->quoteIdentifier($this->cursorName) )); } return $query; } public function close() { if (!$this->isOpen) { return; } $this->connection->exec('CLOSE ' . $this->connection->quoteIdentifier($this->cursorName)); $this->isOpen = false; } private function openCursor() { if ($this->query->getEntityManager()->getConnection()->getTransactionNestingLevel() === 0) { throw new \BadMethodCallException('Cursor must be used inside a transaction'); } $query = clone $this->query; $query->setSQL(sprintf( 'DECLARE %s CURSOR FOR (%s)', $this->connection->quoteIdentifier($this->cursorName), $this->query->getSQL() )); $query->execute($this->query->getParameters()); $this->isOpen = true; } }