Skip to content

Commit da124a7

Browse files
authored
Merge pull request #299 from FoundatioFx/bug/cache-lock-provider-acquiring-mutliple-locks
2 parents 55da2bd + 40e03ab commit da124a7

32 files changed

+439
-301
lines changed

.github/workflows/build-workflow.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ jobs:
4242
run: "echo ref: ${{github.ref}} event: ${{github.event_name}}"
4343
- name: Build Version
4444
run: |
45-
dotnet tool install --global minver-cli --version 4.3.0
45+
dotnet tool install --global minver-cli --version 5.0.0
4646
version=$(minver --tag-prefix v)
4747
echo "MINVERVERSIONOVERRIDE=$version" >> $GITHUB_ENV
4848
echo "### Version: $version" >> $GITHUB_STEP_SUMMARY

.gitignore

+1-20
Original file line numberDiff line numberDiff line change
@@ -32,23 +32,4 @@ _NCrunch_*
3232
.DS_Store
3333

3434
# Rider
35-
36-
# User specific
37-
**/.idea/**/workspace.xml
38-
**/.idea/**/tasks.xml
39-
**/.idea/shelf/*
40-
**/.idea/dictionaries
41-
42-
# Sensitive or high-churn files
43-
**/.idea/**/dataSources/
44-
**/.idea/**/dataSources.ids
45-
**/.idea/**/dataSources.xml
46-
**/.idea/**/dataSources.local.xml
47-
**/.idea/**/sqlDataSources.xml
48-
**/.idea/**/dynamic.xml
49-
50-
# Rider
51-
# Rider auto-generates .iml files, and contentModel.xml
52-
**/.idea/**/*.iml
53-
**/.idea/**/contentModel.xml
54-
**/.idea/**/modules.xml
35+
.idea

build/common.props

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
<ItemGroup>
3939
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="All"/>
4040
<PackageReference Include="AsyncFixer" Version="1.6.0" PrivateAssets="All" />
41-
<PackageReference Include="MinVer" Version="4.3.0" PrivateAssets="All" />
41+
<PackageReference Include="MinVer" Version="5.0.0" PrivateAssets="All" />
4242
</ItemGroup>
4343

4444
<ItemGroup>

src/Foundatio.Extensions.Hosting/Jobs/ScheduledJobService.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class ScheduledJobService : BackgroundService, IJobStatus
2424
public ScheduledJobService(IServiceProvider serviceProvider, ILoggerFactory loggerFactory)
2525
{
2626
_serviceProvider = serviceProvider;
27-
var cacheClient = serviceProvider.GetService<ICacheClient>() ?? new InMemoryCacheClient();
27+
var cacheClient = serviceProvider.GetService<ICacheClient>() ?? new InMemoryCacheClient(o => o.LoggerFactory(loggerFactory));
2828
_jobs = new List<ScheduledJobRunner>(serviceProvider.GetServices<ScheduledJobRegistration>().Select(j => new ScheduledJobRunner(j.JobFactory, j.Schedule, cacheClient, loggerFactory)));
2929

3030
var lifetime = serviceProvider.GetService<ShutdownHostIfNoJobsRunningService>();

src/Foundatio.TestHarness/Caching/CacheClientTestsBase.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,8 @@ public virtual async Task CanAddConcurrentlyAsync()
210210

211211
string cacheKey = Guid.NewGuid().ToString("N").Substring(10);
212212
long adds = 0;
213-
await Run.InParallelAsync(5, async i =>
213+
214+
await Parallel.ForEachAsync(Enumerable.Range(1, 5), async (i, _) =>
214215
{
215216
if (await cache.AddAsync(cacheKey, i, TimeSpan.FromMinutes(1)))
216217
Interlocked.Increment(ref adds);

src/Foundatio.TestHarness/Caching/HybridCacheClientTests.cs

+8-4
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,18 @@ namespace Foundatio.Tests.Caching;
1313

1414
public class HybridCacheClientTests : CacheClientTestsBase, IDisposable
1515
{
16-
protected readonly ICacheClient _distributedCache = new InMemoryCacheClient(new InMemoryCacheClientOptions());
17-
protected readonly IMessageBus _messageBus = new InMemoryMessageBus(new InMemoryMessageBusOptions());
16+
protected readonly ICacheClient _distributedCache;
17+
protected readonly IMessageBus _messageBus;
1818

19-
public HybridCacheClientTests(ITestOutputHelper output) : base(output) { }
19+
public HybridCacheClientTests(ITestOutputHelper output) : base(output)
20+
{
21+
_distributedCache = new InMemoryCacheClient(o => o.LoggerFactory(Log));
22+
_messageBus = new InMemoryMessageBus(o => o.LoggerFactory(Log));
23+
}
2024

2125
protected override ICacheClient GetCacheClient(bool shouldThrowOnSerializationError = true)
2226
{
23-
return new HybridCacheClient(_distributedCache, _messageBus, new InMemoryCacheClientOptions { CloneValues = true, ShouldThrowOnSerializationError = shouldThrowOnSerializationError }, Log);
27+
return new HybridCacheClient(_distributedCache, _messageBus, new InMemoryCacheClientOptions { CloneValues = true, ShouldThrowOnSerializationError = shouldThrowOnSerializationError, LoggerFactory = Log }, Log);
2428
}
2529

2630
[Fact]

src/Foundatio.TestHarness/Jobs/JobQueueTestsBase.cs

+7-7
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public virtual async Task CanRunQueueJobAsync()
7474
using var queue = GetSampleWorkItemQueue(retries: 0, retryDelay: TimeSpan.Zero);
7575
await queue.DeleteQueueAsync();
7676

77-
var enqueueTask = Run.InParallelAsync(workItemCount, index => queue.EnqueueAsync(new SampleQueueWorkItem
77+
var enqueueTask = Parallel.ForEachAsync(Enumerable.Range(1, workItemCount), async (index, _) => await queue.EnqueueAsync(new SampleQueueWorkItem
7878
{
7979
Created = SystemClock.UtcNow,
8080
Path = "somepath" + index
@@ -99,17 +99,17 @@ public virtual async Task CanRunQueueJobWithLockFailAsync()
9999
using var queue = GetSampleWorkItemQueue(retries: 3, retryDelay: TimeSpan.Zero);
100100
await queue.DeleteQueueAsync();
101101

102-
var enqueueTask = Run.InParallelAsync(workItemCount, index =>
102+
var enqueueTask = Parallel.ForEachAsync(Enumerable.Range(1, workItemCount), async (index, _) =>
103103
{
104104
_logger.LogInformation($"Enqueue #{index}");
105-
return queue.EnqueueAsync(new SampleQueueWorkItem
105+
await queue.EnqueueAsync(new SampleQueueWorkItem
106106
{
107107
Created = SystemClock.UtcNow,
108108
Path = "somepath" + index
109109
});
110110
});
111111

112-
var lockProvider = new ThrottlingLockProvider(new InMemoryCacheClient(new InMemoryCacheClientOptions()), allowedLockCount, TimeSpan.FromDays(1), Log);
112+
var lockProvider = new ThrottlingLockProvider(new InMemoryCacheClient(o => o.LoggerFactory(Log)), allowedLockCount, TimeSpan.FromDays(1), Log);
113113
var job = new SampleQueueJobWithLocking(queue, null, lockProvider, Log);
114114
await SystemClock.SleepAsync(10);
115115
_logger.LogInformation("Starting RunUntilEmptyAsync");
@@ -147,10 +147,10 @@ public virtual async Task CanRunMultipleQueueJobsAsync()
147147
}
148148
_logger.LogInformation("Done setting up queues");
149149

150-
var enqueueTask = Run.InParallelAsync(workItemCount, index =>
150+
var enqueueTask = Parallel.ForEachAsync(Enumerable.Range(1, workItemCount), async (_, _) =>
151151
{
152152
var queue = queues[RandomData.GetInt(0, jobCount - 1)];
153-
return queue.EnqueueAsync(new SampleQueueWorkItem
153+
await queue.EnqueueAsync(new SampleQueueWorkItem
154154
{
155155
Created = SystemClock.UtcNow,
156156
Path = RandomData.GetString()
@@ -159,7 +159,7 @@ public virtual async Task CanRunMultipleQueueJobsAsync()
159159
_logger.LogInformation("Done enqueueing");
160160

161161
var cancellationTokenSource = new CancellationTokenSource();
162-
await Run.InParallelAsync(jobCount, async index =>
162+
await Parallel.ForEachAsync(Enumerable.Range(1, jobCount), async (index, _) =>
163163
{
164164
var queue = queues[index - 1];
165165
var job = new SampleQueueWithRandomErrorsAndAbandonsJob(queue, metrics, Log);

src/Foundatio.TestHarness/Jobs/ThrottledJob.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ public ThrottledJob(ICacheClient client, ILoggerFactory loggerFactory = null) :
1818
private readonly ILockProvider _locker;
1919
public int RunCount { get; set; }
2020

21-
protected override Task<ILock> GetLockAsync(CancellationToken cancellationToken = default(CancellationToken))
21+
protected override Task<ILock> GetLockAsync(CancellationToken cancellationToken = default)
2222
{
2323
return _locker.AcquireAsync(nameof(ThrottledJob), acquireTimeout: TimeSpan.Zero);
2424
}
2525

2626
protected override Task<JobResult> RunInternalAsync(JobContext context)
2727
{
2828
RunCount++;
29-
29+
_logger.LogDebug("Incremented Run Count: {RunCount}", RunCount);
3030
return Task.FromResult(JobResult.Success);
3131
}
3232
}

src/Foundatio.TestHarness/Jobs/WithLockingJob.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public class WithLockingJob : JobWithLockBase
1717

1818
public WithLockingJob(ILoggerFactory loggerFactory) : base(loggerFactory)
1919
{
20-
_locker = new CacheLockProvider(new InMemoryCacheClient(new InMemoryCacheClientOptions { LoggerFactory = loggerFactory }), new InMemoryMessageBus(new InMemoryMessageBusOptions { LoggerFactory = loggerFactory }), loggerFactory);
20+
_locker = new CacheLockProvider(new InMemoryCacheClient(o => o.LoggerFactory(loggerFactory)), new InMemoryMessageBus(o => o.LoggerFactory(loggerFactory)), loggerFactory);
2121
}
2222

2323
public int RunCount { get; set; }

src/Foundatio.TestHarness/Locks/LockTestBase.cs

+73-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Collections.Generic;
33
using System.Diagnostics;
44
using System.Linq;
@@ -55,8 +55,7 @@ public virtual async Task CanAcquireAndReleaseLockAsync()
5555

5656
int counter = 0;
5757

58-
bool isTraceLogLevelEnabled = _logger.IsEnabled(LogLevel.Trace);
59-
await Run.InParallelAsync(25, async i =>
58+
await Parallel.ForEachAsync(Enumerable.Range(1, 25), async (_, _) =>
6059
{
6160
bool success = await locker.TryUsingAsync("test", () =>
6261
{
@@ -183,14 +182,50 @@ public virtual async Task CanAcquireLocksInParallel()
183182
if (locker == null)
184183
return;
185184

185+
Log.SetLogLevel<CacheLockProvider>(LogLevel.Trace);
186+
187+
const int COUNT = 100;
188+
int current = 1;
189+
var used = new List<int>();
190+
int concurrency = 0;
191+
192+
await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (_, ct) =>
193+
{
194+
await using var myLock = await locker.AcquireAsync("test", TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
195+
Assert.NotNull(myLock);
196+
197+
int currentConcurrency = Interlocked.Increment(ref concurrency);
198+
Assert.Equal(1, currentConcurrency);
199+
200+
int item = current;
201+
await Task.Delay(10, ct);
202+
used.Add(item);
203+
current++;
204+
205+
Interlocked.Decrement(ref concurrency);
206+
});
207+
208+
var duplicates = used.GroupBy(x => x).Where(g => g.Count() > 1);
209+
Assert.Empty(duplicates);
210+
Assert.Equal(COUNT, used.Count);
211+
}
212+
213+
public virtual async Task CanAcquireScopedLocksInParallel()
214+
{
215+
var lockProvider = GetLockProvider();
216+
if (lockProvider == null)
217+
return;
218+
219+
var locker = new ScopedLockProvider(lockProvider, "scoped");
220+
186221
Log.SetLogLevel<CacheLockProvider>(LogLevel.Debug);
187222

188223
const int COUNT = 100;
189224
int current = 1;
190225
var used = new List<int>();
191226
int concurrency = 0;
192227

193-
await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (index, ct) =>
228+
await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (_, ct) =>
194229
{
195230
await using var myLock = await locker.AcquireAsync("test", TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
196231
Assert.NotNull(myLock);
@@ -211,6 +246,40 @@ await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (index, ct) =>
211246
Assert.Equal(COUNT, used.Count);
212247
}
213248

249+
public virtual async Task CanAcquireMultipleLocksInParallel()
250+
{
251+
var locker = GetLockProvider();
252+
if (locker == null)
253+
return;
254+
255+
Log.SetLogLevel<CacheLockProvider>(LogLevel.Debug);
256+
257+
const int COUNT = 100;
258+
int current = 1;
259+
var used = new List<int>();
260+
int concurrency = 0;
261+
262+
await Parallel.ForEachAsync(Enumerable.Range(1, COUNT), async (_, ct) =>
263+
{
264+
await using var myLock = await locker.AcquireAsync(["test"], TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
265+
Assert.NotNull(myLock);
266+
267+
int currentConcurrency = Interlocked.Increment(ref concurrency);
268+
Assert.Equal(1, currentConcurrency);
269+
270+
int item = current;
271+
await Task.Delay(10, ct);
272+
used.Add(item);
273+
current++;
274+
275+
Interlocked.Decrement(ref concurrency);
276+
});
277+
278+
var duplicates = used.GroupBy(x => x).Where(g => g.Count() > 1);
279+
Assert.Empty(duplicates);
280+
Assert.Equal(COUNT, used.Count);
281+
}
282+
214283
public virtual async Task LockOneAtATimeAsync()
215284
{
216285
var locker = GetLockProvider();
@@ -271,7 +340,6 @@ private Task<bool> DoLockedWorkAsync(ILockProvider locker)
271340

272341
public virtual async Task WillThrottleCallsAsync()
273342
{
274-
Log.DefaultMinimumLevel = LogLevel.Trace;
275343
Log.SetLogLevel<ScheduledTimer>(LogLevel.Information);
276344
Log.SetLogLevel<ThrottlingLockProvider>(LogLevel.Trace);
277345

src/Foundatio.TestHarness/Messaging/MessageBusTestBase.cs

+17-16
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Diagnostics;
4+
using System.Linq;
45
using System.Threading;
56
using System.Threading.Tasks;
67
using Exceptionless;
@@ -248,7 +249,7 @@ await messageBus.SubscribeAsync<SimpleMessageA>(msg =>
248249
});
249250

250251
var sw = Stopwatch.StartNew();
251-
await Run.InParallelAsync(numConcurrentMessages, async i =>
252+
await Parallel.ForEachAsync(Enumerable.Range(1, numConcurrentMessages), async (i, _) =>
252253
{
253254
await messageBus.PublishAsync(new SimpleMessageA
254255
{
@@ -282,16 +283,16 @@ public virtual async Task CanSubscribeConcurrentlyAsync()
282283
try
283284
{
284285
var countdown = new AsyncCountdownEvent(iterations * 10);
285-
await Run.InParallelAsync(10, i =>
286+
await Parallel.ForEachAsync(Enumerable.Range(1, 10), async (_, ct) =>
286287
{
287-
return messageBus.SubscribeAsync<SimpleMessageA>(msg =>
288+
await messageBus.SubscribeAsync<SimpleMessageA>(msg =>
288289
{
289290
Assert.Equal("Hello", msg.Data);
290291
countdown.Signal();
291-
});
292+
}, cancellationToken: ct);
292293
});
293294

294-
await Run.InParallelAsync(iterations, i => messageBus.PublishAsync(new SimpleMessageA { Data = "Hello" }));
295+
await Parallel.ForEachAsync(Enumerable.Range(1, iterations), async (_, _) => await messageBus.PublishAsync(new SimpleMessageA { Data = "Hello" }));
295296
await countdown.WaitAsync(TimeSpan.FromSeconds(2));
296297
Assert.Equal(0, countdown.CurrentCount);
297298
}
@@ -312,27 +313,27 @@ public virtual async Task CanReceiveMessagesConcurrentlyAsync()
312313
try
313314
{
314315
var countdown = new AsyncCountdownEvent(iterations * 10);
315-
await Run.InParallelAsync(10, async i =>
316+
await Parallel.ForEachAsync(Enumerable.Range(1, 10), async (_, ct) =>
316317
{
317318
var bus = GetMessageBus();
318319
await bus.SubscribeAsync<SimpleMessageA>(msg =>
319320
{
320321
Assert.Equal("Hello", msg.Data);
321322
countdown.Signal();
322-
});
323+
}, cancellationToken: ct);
323324

324325
messageBuses.Add(bus);
325326
});
326-
var subscribe = Run.InParallelAsync(iterations,
327-
i =>
328-
{
329-
SystemClock.Sleep(RandomData.GetInt(0, 10));
330-
return messageBuses.Random().SubscribeAsync<NeverPublishedMessage>(msg => Task.CompletedTask);
331-
});
332327

333-
var publish = Run.InParallelAsync(iterations + 3, i =>
328+
var subscribe = Parallel.ForEachAsync(Enumerable.Range(1, iterations), async (i, ct) =>
334329
{
335-
return i switch
330+
await SystemClock.SleepAsync(RandomData.GetInt(0, 10), ct);
331+
await messageBuses.Random().SubscribeAsync<NeverPublishedMessage>(msg => Task.CompletedTask, cancellationToken: ct);
332+
});
333+
334+
var publish = Parallel.ForEachAsync(Enumerable.Range(1, iterations + 3), async (i, _) =>
335+
{
336+
await (i switch
336337
{
337338
1 => messageBus.PublishAsync(new DerivedSimpleMessageA { Data = "Hello" }),
338339
2 => messageBus.PublishAsync(new Derived2SimpleMessageA { Data = "Hello" }),
@@ -348,7 +349,7 @@ await bus.SubscribeAsync<SimpleMessageA>(msg =>
348349
iterations + 2 => messageBus.PublishAsync(new SimpleMessageC { Data = "Hello" }),
349350
iterations + 3 => messageBus.PublishAsync(new SimpleMessageB { Data = "Hello" }),
350351
_ => messageBus.PublishAsync(new SimpleMessageA { Data = "Hello" }),
351-
};
352+
});
352353
});
353354

354355
await Task.WhenAll(subscribe, publish);

0 commit comments

Comments
 (0)