///
/// Integration tests for the GetEventStoreRepository. These tests require a
/// running version of the Event Store, with a TCP endpoint as specified in the
/// IntegrationTestTcpEndPoint field (defaults to local loopback, port 1113).
///
[TestFixture]
public class GetEventStoreRepositoryIntegrationTests
{
///
/// Set this to the TCP endpoint on which the Event Store is running.
///
private static readonly IPEndPoint IntegrationTestTcpEndPoint = new IPEndPoint(IPAddress.Loopback, 1113);
private static Guid SaveTestAggregateWithoutCustomHeaders(IRepository repository, int numberOfEvents)
{
var aggregateToSave = new TestAggregate(Guid.NewGuid());
aggregateToSave.ProduceEvents(numberOfEvents);
repository.Save(aggregateToSave, Guid.NewGuid(), d => { });
return aggregateToSave.Id;
}
[Test]
public void ClearsEventsFromAggregateOnceCommitted()
{
var connection = EventStoreConnection.Create();
connection.Connect(IntegrationTestTcpEndPoint)
var repo = new GetEventStoreRepository(connection);
var aggregateToSave = new TestAggregate(Guid.NewGuid());
aggregateToSave.ProduceEvents(10);
repo.Save(aggregateToSave, Guid.NewGuid(), d => { });
Assert.AreEqual(0, ((IAggregate)aggregateToSave).GetUncommittedEvents().Count);
}
[Test]
public void CanGetLatestVersionById()
{
var connection = EventStoreConnection.Create();
connection.Connect(IntegrationTestTcpEndPoint)
var repo = new GetEventStoreRepository(connection);
var savedId = SaveTestAggregateWithoutCustomHeaders(repo, 3000 /* excludes TestAggregateCreated */);
var retrieved = repo.GetById(savedId);
Assert.AreEqual(3000, retrieved.AppliedEventCount);
connection.Close();
}
[Test]
public void CanGetSpecificVersionFromFirstPageById()
{
var connection = EventStoreConnection.Create();
connection.Connect(IntegrationTestTcpEndPoint)
var repo = new GetEventStoreRepository(connection);
var savedId = SaveTestAggregateWithoutCustomHeaders(repo, 100 /* excludes TestAggregateCreated */);
var retrieved = repo.GetById(savedId, 65);
Assert.AreEqual(64, retrieved.AppliedEventCount);
connection.Close();
}
[Test]
public void CanGetSpecificVersionFromSubsequentPageById()
{
var connection = EventStoreConnection.Create();
connection.Connect(IntegrationTestTcpEndPoint)
var repo = new GetEventStoreRepository(connection);
var savedId = SaveTestAggregateWithoutCustomHeaders(repo, 500 /* excludes TestAggregateCreated */);
var retrieved = repo.GetById(savedId, 126);
Assert.AreEqual(125, retrieved.AppliedEventCount);
connection.Close();
}
[Test]
public void CanSaveExistingAggregate()
{
var connection = EventStoreConnection.Create();
connection.Connect(IntegrationTestTcpEndPoint)
var repo = new GetEventStoreRepository(connection);
var savedId = SaveTestAggregateWithoutCustomHeaders(repo, 100 /* excludes TestAggregateCreated */);
var firstSaved = repo.GetById(savedId);
firstSaved.ProduceEvents(50);
repo.Save(firstSaved, Guid.NewGuid(), d => { });
var secondSaved = repo.GetById(savedId);
Assert.AreEqual(150, secondSaved.AppliedEventCount);
connection.Close();
}
[Test]
public void CanSaveMultiplesOfWritePageSize()
{
var connection = EventStoreConnection.Create();
connection.Connect(IntegrationTestTcpEndPoint)
var repo = new GetEventStoreRepository(connection);
var savedId = SaveTestAggregateWithoutCustomHeaders(repo, 1500 /* excludes TestAggregateCreated */);
var saved = repo.GetById(savedId);
Assert.AreEqual(1500, saved.AppliedEventCount);
connection.Close();
}
[Test]
public void GetsEventsFromCorrectStreams()
{
var connection = EventStoreConnection.Create();
connection.Connect(IntegrationTestTcpEndPoint)
var repo = new GetEventStoreRepository(connection);
var aggregate1Id = SaveTestAggregateWithoutCustomHeaders(repo, 100);
var aggregate2Id = SaveTestAggregateWithoutCustomHeaders(repo, 50);
var firstSaved = repo.GetById(aggregate1Id);
Assert.AreEqual(100, firstSaved.AppliedEventCount);
var secondSaved = repo.GetById(aggregate2Id);
Assert.AreEqual(50, secondSaved.AppliedEventCount);
connection.Close();
}
[Test]
public void CanHandleLargeNumberOfEventsInOneTransaction()
{
const int numberOfEvents = 50000;
var connection = EventStoreConnection.Create();
connection.Connect(IntegrationTestTcpEndPoint)
var repo = new GetEventStoreRepository(connection);
var aggregateId = SaveTestAggregateWithoutCustomHeaders(repo, numberOfEvents);
var saved = repo.GetById(aggregateId);
Assert.AreEqual(numberOfEvents, saved.AppliedEventCount);
connection.Close();
}
[Test]
public void SavesCommitHeadersOnEachEvent()
{
var connection = EventStoreConnection.Create();
connection.Connect(IntegrationTestTcpEndPoint)
var repo = new GetEventStoreRepository(connection);
var commitId = Guid.NewGuid();
var aggregateToSave = new TestAggregate(Guid.NewGuid());
aggregateToSave.ProduceEvents(20);
repo.Save(aggregateToSave, commitId, d =>
{
d.Add("CustomHeader1", "CustomValue1");
d.Add("CustomHeader2", "CustomValue2");
});
var read = connection.ReadStreamEventsForward(string.Format("aggregate-{0}", aggregateToSave.Id), 1, 20, false);
foreach (var serializedEvent in read.Events)
{
var parsedMetadata = JObject.Parse(Encoding.UTF8.GetString(serializedEvent.OriginalEvent.Metadata));
var deserializedCommitId = parsedMetadata.Property("CommitId").Value.ToObject();
Assert.AreEqual(commitId, deserializedCommitId);
var deserializedCustomHeader1 = parsedMetadata.Property("CustomHeader1").Value.ToObject();
Assert.AreEqual("CustomValue1", deserializedCustomHeader1);
var deserializedCustomHeader2 = parsedMetadata.Property("CustomHeader2").Value.ToObject();
Assert.AreEqual("CustomValue2", deserializedCustomHeader2);
}
connection.Close();
}
}