Skip to content

Instantly share code, notes, and snippets.

@altuntasfatih
Last active June 12, 2022 20:36
Show Gist options
  • Select an option

  • Save altuntasfatih/cbefa8c25da5f9e2c0f47efef65947d9 to your computer and use it in GitHub Desktop.

Select an option

Save altuntasfatih/cbefa8c25da5f9e2c0f47efef65947d9 to your computer and use it in GitHub Desktop.
FetchTrackedEvents method
@Override
protected List<? extends TrackedEventData<?>> fetchTrackedEvents(TrackingToken lastToken, int batchSize) {
Assert.isTrue(
lastToken == null || lastToken instanceof GapAwareTrackingToken,
() -> String.format("Token [%s] is of the wrong type. Expected [%s]",
lastToken, GapAwareTrackingToken.class.getSimpleName())
);
GapAwareTrackingToken previousToken = cleanedToken((GapAwareTrackingToken) lastToken);
List<Object[]> entries = transactionManager.fetchInTransaction(() -> {
// if there are many gaps, it worthwhile checking if it is possible to clean them up
TypedQuery<Object[]> query;
if (previousToken == null || previousToken.getGaps().isEmpty()) {
query = entityManager().createQuery(
"SELECT e.globalIndex, e.type, e.aggregateIdentifier, e.sequenceNumber, e.eventIdentifier, "
+ "e.timeStamp, e.payloadType, e.payloadRevision, e.payload, e.metaData " +
"FROM " + domainEventEntryEntityName() + " e " +
"WHERE e.globalIndex > :token ORDER BY e.globalIndex ASC", Object[].class);
} else {
query = entityManager().createQuery(
"SELECT e.globalIndex, e.type, e.aggregateIdentifier, e.sequenceNumber, e.eventIdentifier, "
+ "e.timeStamp, e.payloadType, e.payloadRevision, e.payload, e.metaData " +
"FROM " + domainEventEntryEntityName() + " e " +
"WHERE e.globalIndex > :token OR e.globalIndex IN :gaps ORDER BY e.globalIndex ASC",
Object[].class
).setParameter("gaps", previousToken.getGaps());
}
return query.setParameter("token", previousToken == null ? -1L : previousToken.getIndex())
.setMaxResults(batchSize)
.getResultList();
});
..// The remaining part is related to conversion from Object to TrackedEventData.
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment