@@ -28,9 +28,10 @@ public abstract class TransformEntityWorker<TDbContext, TReadingEntity, TWriting
28
28
protected async Task Transform (
29
29
Func < TDbContext > dbContextFactory ,
30
30
int saveByNthEntityCount ,
31
- Action < EntityEntry < TWritingEntity > > writingEntityEntryAction ,
32
31
Func < TReadingEntity , TExceptionId > readingEntityExceptionIdSelector ,
33
32
Func < TReadingEntity , TWritingEntity > entityTransformer ,
33
+ Action < EntityEntry < TWritingEntity > > writingEntityEntryAction ,
34
+ Action < TDbContext , IEnumerable < TWritingEntity > > writingEntitiesAction ,
34
35
CancellationToken stoppingToken = default )
35
36
{
36
37
var processedEntityCount = 0 ;
@@ -45,26 +46,27 @@ protected async Task Transform(
45
46
from e in readingDb . Set < TReadingEntity > ( ) . AsNoTracking ( ) select e ;
46
47
var writingEntities = new List < TWritingEntity > ( ) ;
47
48
48
- void SaveAndLog ( )
49
+ async Task SaveThenLog ( int processedCount , Process currentProcess )
49
50
{
50
- writingDb . Set < TWritingEntity > ( ) . UpdateRange ( writingEntities ) ;
51
+ writingDb . Set < TWritingEntity > ( ) . AttachRange ( writingEntities ) ;
51
52
writingDb . ChangeTracker . Entries < TWritingEntity > ( ) . ForEach ( writingEntityEntryAction ) ;
52
- var updatedEntityCount = writingDb . SaveChanges ( ) ;
53
+ writingEntitiesAction ( writingDb , writingEntities ) ;
54
+ var updatedEntityCount = await writingDb . SaveChangesAsync ( stoppingToken ) ;
53
55
writingEntities . Clear ( ) ;
54
56
writingDb . ChangeTracker . Clear ( ) ;
55
57
56
58
logger . LogTrace ( "processedEntityCount:{} updatedEntityCount:{} elapsed:{}ms processMemory:{}MiB exceptions:{}" ,
57
- processedEntityCount , updatedEntityCount ,
59
+ processedCount , updatedEntityCount ,
58
60
stopwatch . ElapsedMilliseconds ,
59
- process . PrivateMemorySize64 / 1024 / 1024 ,
61
+ currentProcess . PrivateMemorySize64 / 1024 / 1024 ,
60
62
JsonSerializer . Serialize ( exceptions , JsonSerializerOptions ) ) ;
61
63
stopwatch . Restart ( ) ;
62
64
}
63
65
64
66
foreach ( var readingEntity in readingEntities )
65
67
{
66
68
processedEntityCount ++ ;
67
- if ( processedEntityCount % saveByNthEntityCount == 0 ) SaveAndLog ( ) ;
69
+ if ( processedEntityCount % saveByNthEntityCount == 0 ) await SaveThenLog ( processedEntityCount , process ) ;
68
70
if ( stoppingToken . IsCancellationRequested ) break ;
69
71
try
70
72
{
@@ -85,6 +87,6 @@ void SaveAndLog()
85
87
}
86
88
}
87
89
88
- SaveAndLog ( ) ;
90
+ await SaveThenLog ( processedEntityCount , process ) ;
89
91
}
90
92
}
0 commit comments