Skip to content

Instantly share code, notes, and snippets.

@altuntasfatih
Created June 11, 2022 12:09
Show Gist options
  • Select an option

  • Save altuntasfatih/e27c20bd938f6f13f54d6ac4f0b98396 to your computer and use it in GitHub Desktop.

Select an option

Save altuntasfatih/e27c20bd938f6f13f54d6ac4f0b98396 to your computer and use it in GitHub Desktop.
Overriden eventStorage engine
private List<Object[]> getEvents(GapAwareTrackingToken previousToken, int batchSize) {
List<Object[]> events = new ArrayList<>();
if (!Objects.isNull(previousToken) && !previousToken.getGaps().isEmpty()) {
List<Object[]> gapResult = entityManager()
.createQuery("SELECT e.globalIndex, e.type, e.aggregateIdentifier, e.sequenceNumber, e.eventIdentifier, " + "e.timeStamp, e.payloadType, e.payloadRevision, e.payload, e.metaData " + "FROM " + domainEventEntryEntityName() + " e " + "WHERE e.globalIndex IN :gaps ORDER BY e.globalIndex ASC", Object[].class)
.setParameter("gaps", previousToken.getGaps())
.setMaxResults(batchSize)
.getResultList();
events.addAll(gapResult);
}
List<Object[]> indexResult = entityManager()
.createQuery("SELECT e.globalIndex, e.type, e.aggregateIdentifier, e.sequenceNumber, e.eventIdentifier, " + "e.timeStamp, e.payloadType, e.payloadRevision, e.payload, e.metaData " + "FROM " + domainEventEntryEntityName() + " e " + "WHERE e.globalIndex > :token ORDER BY e.globalIndex ASC", Object[].class)
.setParameter("token", previousToken == null ? -1L : previousToken.getIndex())
.setMaxResults(batchSize - events.size())
.getResultList();
events.addAll(indexResult);
return events;
}
@Override
protected List<? extends TrackedEventData<?>> fetchTrackedEvents(TrackingToken lastToken, int batchSize) {
Assert.isTrue(lastToken == null || lastToken instanceof GapAwareTrackingToken, () -> String.format("Token [%s] is of the wrong type. Expected [%s]", lastToken, GapAwareTrackingToken.class.getSimpleName()));
GapAwareTrackingToken previousToken = cleanedToken((GapAwareTrackingToken) lastToken);
List<Object[]> entries = transactionManager.fetchInTransaction(() -> (getEvents(previousToken, batchSize)));
...// The remaining part is related conversion from Object to TrackedEventData
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment