Skip to content

Commit

Permalink
Azure 이벤트 저장소 동시성 문제를 해결한다
Browse files Browse the repository at this point in the history
기존 이벤트 저장소 파티션 구성이 가진 동시성 문제를 해결하고 일부 성능을
개선한다.

Issue: #28
  • Loading branch information
gyuwon committed Jan 7, 2018
1 parent fe8120d commit 21f3ea9
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ private Task FlushEvents(T source, CancellationToken cancellationToken)

private Task SaveMementoIfPossible(T source, CancellationToken cancellationToken)
{
if (_mementoStore != null &&
source is IMementoOriginator mementoOriginator)
if (_mementoStore != null && source is IMementoOriginator mementoOriginator)
{
IMemento memento = mementoOriginator.SaveToMemento();
return _mementoStore.Save<T>(source.Id, memento, cancellationToken);
Expand All @@ -100,8 +99,7 @@ public Task<T> Find(Guid sourceId, CancellationToken cancellationToken = default
{
if (sourceId == Guid.Empty)
{
throw new ArgumentException(
$"{nameof(sourceId)} cannot be empty.", nameof(sourceId));
throw new ArgumentException($"{nameof(sourceId)} cannot be empty.", nameof(sourceId));
}

return PublishAndRestore(sourceId, cancellationToken);
Expand All @@ -110,16 +108,21 @@ public Task<T> Find(Guid sourceId, CancellationToken cancellationToken = default
private async Task<T> PublishAndRestore(
Guid sourceId, CancellationToken cancellationToken)
{
await _eventPublisher
.FlushPendingEvents<T>(sourceId, cancellationToken)
.ConfigureAwait(false);
T eventSourced = default;
async Task Rehydrate() => eventSourced = await Restore(sourceId, cancellationToken).ConfigureAwait(false);
await Task.WhenAll(Publish(sourceId, cancellationToken), Rehydrate()).ConfigureAwait(false);
return eventSourced;
}

private Task Publish(Guid sourceId, CancellationToken cancellationToken)
=> _eventPublisher.FlushPendingEvents<T>(sourceId, cancellationToken);

private async Task<T> Restore(Guid sourceId, CancellationToken cancellationToken)
{
IMemento memento = null;
if (_mementoStore != null && _mementoEntityFactory != null)
{
memento = await _mementoStore
.Find<T>(sourceId, cancellationToken)
.ConfigureAwait(false);
memento = await _mementoStore.Find<T>(sourceId, cancellationToken).ConfigureAwait(false);
}

IEnumerable<IDomainEvent> domainEvents = await _eventStore
Expand All @@ -128,9 +131,7 @@ await _eventPublisher

return
memento == null
? domainEvents.Any()
? _entityFactory.Invoke(sourceId, domainEvents)
: default
? domainEvents.Any() ? _entityFactory.Invoke(sourceId, domainEvents) : default
: _mementoEntityFactory.Invoke(sourceId, memento, domainEvents);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ public class AzureMementoStore : IMementoStore
private readonly CloudBlobContainer _container;
private readonly IMessageSerializer _serializer;

public AzureMementoStore(
CloudBlobContainer container,
IMessageSerializer serializer)
public AzureMementoStore(CloudBlobContainer container, IMessageSerializer serializer)
{
_container = container ?? throw new ArgumentNullException(nameof(container));
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
public class AzureEventPublisher_specs
{
private static IMessageSerializer s_serializer;
private static CloudStorageAccount s_storageAccount;
private static CloudTable s_eventTable;

[ClassInitialize]
Expand All @@ -31,8 +30,7 @@ public static async Task ClassInitialize(TestContext context)

try
{
s_storageAccount = CloudStorageAccount.DevelopmentStorageAccount;
CloudTableClient tableClient = s_storageAccount.CreateCloudTableClient();
CloudTableClient tableClient = CloudStorageAccount.DevelopmentStorageAccount.CreateCloudTableClient();
s_eventTable = tableClient.GetTableReference("AzureEventPublisherTestEventStore");
await s_eventTable.DeleteIfExistsAsync(
new TableRequestOptions { RetryPolicy = new NoRetry() },
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
namespace Khala.EventSourcing.Azure
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using AutoFixture;
using AutoFixture.AutoMoq;
using FluentAssertions;
using Khala.FakeDomain;
using Khala.Messaging;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.RetryPolicies;
using Microsoft.WindowsAzure.Storage.Table;
using Moq;
using Ploeh.AutoFixture;
using Ploeh.AutoFixture.AutoMoq;

[TestClass]
public class AzureEventSourcedRepository_specs
Expand Down Expand Up @@ -298,93 +292,5 @@ public async Task Find_restores_aggregate_using_memento_if_found()
Mock.Get(_eventStore).Verify();
actual.ShouldBeEquivalentTo(user);
}

[TestMethod]
public async Task SaveAndPublish_is_concurrency_safe()
{
// Arrange
CloudTable eventTable = InitializeEventTable("AzureEventSourcedRepositoryConcurrencyTest");
var serializer = new JsonMessageSerializer();
var messageBus = new MessageBus();
var eventStore = new AzureEventStore(eventTable, serializer);
var eventPublisher = new AzureEventPublisher(eventTable, serializer, messageBus);
var sut = new AzureEventSourcedRepository<FakeUser>(eventStore, eventPublisher, FakeUser.Factory);
var userId = Guid.NewGuid();
await sut.SaveAndPublish(new FakeUser(userId, Guid.NewGuid().ToString()));

// Act
async Task Process()
{
for (int i = 0; i < 10; i++)
{
try
{
FakeUser user = await sut.Find(userId);
user.ChangeUsername(Guid.NewGuid().ToString());
user.ChangeUsername(Guid.NewGuid().ToString());
user.ChangeUsername(Guid.NewGuid().ToString());
await sut.SaveAndPublish(user);
}
catch
{
}
}
}

await Task.WhenAll(Enumerable.Range(0, 3).Select(_ => Process()));
await Task.Delay(millisecondsDelay: 100);

// Assert
IEnumerable<IDomainEvent> expected = await eventStore.LoadEvents<FakeUser>(userId);
var actual = messageBus.Log.Select(e => (IDomainEvent)e.Message).Distinct(e => e.Version).OrderBy(e => e.Version).ToList();
actual.ShouldAllBeEquivalentTo(expected, opts => opts.WithStrictOrdering().Excluding(e => e.RaisedAt));
}

private CloudTable InitializeEventTable(string eventTableName)
{
CloudTable eventTable = default;

try
{
CloudStorageAccount storageAccount = CloudStorageAccount.DevelopmentStorageAccount;
CloudTableClient tableClient = storageAccount.CreateCloudTableClient();
eventTable = tableClient.GetTableReference(eventTableName);
eventTable.DeleteIfExists(new TableRequestOptions { RetryPolicy = new NoRetry() });
eventTable.Create();
}
catch (StorageException exception)
when (exception.InnerException is WebException)
{
TestContext.WriteLine("{0}", exception);
Assert.Inconclusive("Could not connect to Azure Storage Emulator. See the output for details. Refer to the following URL for more information: http://go.microsoft.com/fwlink/?LinkId=392237");
}

return eventTable;
}

private class MessageBus : IMessageBus
{
private ConcurrentQueue<Envelope> _log = new ConcurrentQueue<Envelope>();

public IEnumerable<Envelope> Log => _log;

public Task Send(Envelope envelope, CancellationToken cancellationToken)
{
Task.Factory.StartNew(() => _log.Enqueue(envelope));
return Task.CompletedTask;
}

public Task Send(IEnumerable<Envelope> envelopes, CancellationToken cancellationToken)
{
Task.Factory.StartNew(() =>
{
foreach (Envelope envelope in envelopes)
{
_log.Enqueue(envelope);
}
});
return Task.CompletedTask;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
[TestClass]
public class AzureEventStore_specs
{
private static CloudStorageAccount s_storageAccount;
private static CloudTable s_eventTable;
private IMessageSerializer _serializer;
private AzureEventStore _sut;
Expand All @@ -28,8 +27,7 @@ public static async Task ClassInitialize(TestContext context)
{
try
{
s_storageAccount = CloudStorageAccount.DevelopmentStorageAccount;
CloudTableClient tableClient = s_storageAccount.CreateCloudTableClient();
CloudTableClient tableClient = CloudStorageAccount.DevelopmentStorageAccount.CreateCloudTableClient();
s_eventTable = tableClient.GetTableReference("AzureEventStoreTestEventStore");
await s_eventTable.DeleteIfExistsAsync(
new TableRequestOptions { RetryPolicy = new NoRetry() },
Expand Down
Loading

0 comments on commit 21f3ea9

Please sign in to comment.