Last active
June 12, 2022 20:36
-
-
Save altuntasfatih/cbefa8c25da5f9e2c0f47efef65947d9 to your computer and use it in GitHub Desktop.
FetchTrackedEvents method
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @Override | |
| protected List<? extends TrackedEventData<?>> fetchTrackedEvents(TrackingToken lastToken, int batchSize) { | |
| Assert.isTrue( | |
| lastToken == null || lastToken instanceof GapAwareTrackingToken, | |
| () -> String.format("Token [%s] is of the wrong type. Expected [%s]", | |
| lastToken, GapAwareTrackingToken.class.getSimpleName()) | |
| ); | |
| GapAwareTrackingToken previousToken = cleanedToken((GapAwareTrackingToken) lastToken); | |
| List<Object[]> entries = transactionManager.fetchInTransaction(() -> { | |
| // if there are many gaps, it worthwhile checking if it is possible to clean them up | |
| TypedQuery<Object[]> query; | |
| if (previousToken == null || previousToken.getGaps().isEmpty()) { | |
| query = entityManager().createQuery( | |
| "SELECT e.globalIndex, e.type, e.aggregateIdentifier, e.sequenceNumber, e.eventIdentifier, " | |
| + "e.timeStamp, e.payloadType, e.payloadRevision, e.payload, e.metaData " + | |
| "FROM " + domainEventEntryEntityName() + " e " + | |
| "WHERE e.globalIndex > :token ORDER BY e.globalIndex ASC", Object[].class); | |
| } else { | |
| query = entityManager().createQuery( | |
| "SELECT e.globalIndex, e.type, e.aggregateIdentifier, e.sequenceNumber, e.eventIdentifier, " | |
| + "e.timeStamp, e.payloadType, e.payloadRevision, e.payload, e.metaData " + | |
| "FROM " + domainEventEntryEntityName() + " e " + | |
| "WHERE e.globalIndex > :token OR e.globalIndex IN :gaps ORDER BY e.globalIndex ASC", | |
| Object[].class | |
| ).setParameter("gaps", previousToken.getGaps()); | |
| } | |
| return query.setParameter("token", previousToken == null ? -1L : previousToken.getIndex()) | |
| .setMaxResults(batchSize) | |
| .getResultList(); | |
| }); | |
| ..// The remaining part is related to conversion from Object to TrackedEventData. | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment