Skip to content

Commit 0518b22

Browse files
refactor and add tests
Signed-off-by: Juraj Filan <[email protected]>
1 parent 62a223c commit 0518b22

File tree

3 files changed

+71
-46
lines changed

3 files changed

+71
-46
lines changed

store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt

+1-20
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,11 @@ package org.mobilenativefoundation.store.store5.impl
44

55
import co.touchlab.kermit.CommonWriter
66
import co.touchlab.kermit.Logger
7-
import kotlinx.coroutines.channels.Channel
87
import kotlinx.coroutines.flow.Flow
9-
import kotlinx.coroutines.flow.emitAll
10-
import kotlinx.coroutines.flow.filter
118
import kotlinx.coroutines.flow.first
129
import kotlinx.coroutines.flow.flow
1310
import kotlinx.coroutines.flow.flowOf
14-
import kotlinx.coroutines.flow.map
15-
import kotlinx.coroutines.flow.merge
1611
import kotlinx.coroutines.flow.onEach
17-
import kotlinx.coroutines.flow.receiveAsFlow
1812
import kotlinx.coroutines.sync.Mutex
1913
import kotlinx.coroutines.sync.withLock
2014
import org.mobilenativefoundation.store.store5.Bookkeeper
@@ -23,7 +17,6 @@ import org.mobilenativefoundation.store.core5.ExperimentalStoreApi
2317
import org.mobilenativefoundation.store.store5.MutableStore
2418
import org.mobilenativefoundation.store.store5.StoreReadRequest
2519
import org.mobilenativefoundation.store.store5.StoreReadResponse
26-
import org.mobilenativefoundation.store.store5.StoreReadResponseOrigin
2720
import org.mobilenativefoundation.store.store5.StoreWriteRequest
2821
import org.mobilenativefoundation.store.store5.StoreWriteResponse
2922
import org.mobilenativefoundation.store.store5.Updater
@@ -45,8 +38,6 @@ internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local :
4538
private val keyToWriteRequestQueue = mutableMapOf<Key, WriteRequestQueue<Key, Output, *>>()
4639
private val keyToThreadSafety = mutableMapOf<Key, ThreadSafety>()
4740

48-
private val writeRequestChannel = Channel<Pair<Key, Output>>()
49-
5041
override fun <Response : Any> stream(request: StoreReadRequest<Key>): Flow<StoreReadResponse<Output>> =
5142
flow {
5243
safeInitStore(request.key)
@@ -69,14 +60,7 @@ internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local :
6960
}
7061
}
7162

72-
emitAll(
73-
merge(
74-
delegate.stream(request),
75-
writeRequestChannel.receiveAsFlow()
76-
.filter { it.first == request.key }
77-
.map { StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache) },
78-
)
79-
)
63+
delegate.stream(request).collect { storeReadResponse -> emit(storeReadResponse) }
8064
}
8165

8266
@ExperimentalStoreApi
@@ -90,9 +74,6 @@ internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local :
9074
.collect { writeRequest ->
9175
val storeWriteResponse = try {
9276
delegate.write(writeRequest.key, writeRequest.value)
93-
if (!delegate.hasSourceOfTruth()) {
94-
writeRequestChannel.trySend(writeRequest.key to writeRequest.value)
95-
}
9677
when (val updaterResult = tryUpdateServer(writeRequest)) {
9778
is UpdaterResult.Error.Exception -> StoreWriteResponse.Error.Exception(updaterResult.error)
9879
is UpdaterResult.Error.Message -> StoreWriteResponse.Error.Message(updaterResult.message)

store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt

+19-7
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import kotlinx.coroutines.flow.filter
2626
import kotlinx.coroutines.flow.first
2727
import kotlinx.coroutines.flow.flow
2828
import kotlinx.coroutines.flow.map
29+
import kotlinx.coroutines.flow.merge
2930
import kotlinx.coroutines.flow.onEach
3031
import kotlinx.coroutines.flow.onStart
3132
import kotlinx.coroutines.flow.receiveAsFlow
@@ -76,7 +77,8 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
7677
converter = converter
7778
)
7879

79-
private val localOnlyChannel = Channel<Pair<Key, Output>>()
80+
private val writeRequestChannel = Channel<Pair<Key, Output>>()
81+
private val localOnlyRequestChannel = Channel<Pair<Key, Output>>()
8082

8183
@Suppress("UNCHECKED_CAST")
8284
override fun stream(request: StoreReadRequest<Key>): Flow<StoreReadResponse<Output>> =
@@ -105,7 +107,7 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
105107
emit(StoreReadResponse.NoNewData(origin = StoreReadResponseOrigin.Cache))
106108
}
107109
emitAll(
108-
localOnlyChannel.receiveAsFlow()
110+
localOnlyRequestChannel.receiveAsFlow()
109111
.filter { it.first == request.key }
110112
.map {
111113
StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache)
@@ -119,11 +121,18 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
119121
val piggybackOnly = !request.refresh && cachedToEmit != null
120122
@Suppress("UNCHECKED_CAST")
121123

122-
createNetworkFlow(
124+
val networkFlow = createNetworkFlow(
123125
request = request,
124126
networkLock = null,
125127
piggybackOnly = piggybackOnly
126128
) as Flow<StoreReadResponse<Output>> // when no source of truth Input == Output
129+
130+
merge(
131+
networkFlow,
132+
writeRequestChannel.receiveAsFlow()
133+
.filter { writeRequest -> writeRequest.first == request.key }
134+
.map { StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache) }
135+
)
127136
} else if (request.fetch) {
128137
diskNetworkCombined(request, sourceOfTruth)
129138
} else {
@@ -177,7 +186,7 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
177186
}
178187
}
179188
if (sourceOfTruth == null && request.fetch && it is StoreReadResponse.Data) {
180-
localOnlyChannel.trySend(request.key to it.value)
189+
localOnlyRequestChannel.trySend(request.key to it.value)
181190
}
182191
}
183192

@@ -339,7 +348,11 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
339348

340349
internal suspend fun write(key: Key, value: Output): StoreDelegateWriteResult = try {
341350
memCache?.put(key, value)
342-
sourceOfTruth?.write(key, converter.fromOutputToLocal(value))
351+
if (sourceOfTruth != null) {
352+
sourceOfTruth.write(key, converter.fromOutputToLocal(value))
353+
} else {
354+
writeRequestChannel.trySend(key to value)
355+
}
343356
StoreDelegateWriteResult.Success
344357
} catch (error: Throwable) {
345358
StoreDelegateWriteResult.Error.Exception(error)
@@ -348,8 +361,6 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
348361
internal suspend fun latestOrNull(key: Key): Output? =
349362
fromMemCache(key) ?: fromSourceOfTruth(key)
350363

351-
internal fun hasSourceOfTruth() = sourceOfTruth != null
352-
353364
private suspend fun fromSourceOfTruth(key: Key) =
354365
sourceOfTruth?.reader(key, CompletableDeferred(Unit))?.map { it.dataOrNull() }?.first()
355366

@@ -362,3 +373,4 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
362373
}
363374
}
364375
}
376+

store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt

+51-19
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import kotlin.test.Test
66
import kotlin.test.assertEquals
77
import kotlin.test.assertIs
88
import kotlin.test.assertNotNull
9+
import kotlin.test.assertTrue
910
import kotlin.time.Duration.Companion.minutes
1011
import kotlinx.coroutines.flow.first
1112
import kotlinx.coroutines.flow.flow
@@ -270,59 +271,90 @@ class UpdaterTests {
270271
}
271272

272273
@Test
273-
fun collectResponseAfterWriting() = testScope.runTest {
274+
fun collectResponseAfterWritingWithSourceOfTruth() {
274275
val ttl = inHours(1)
275276

276-
val store = StoreBuilder.from<NotesKey, NetworkNote>(
277-
fetcher = Fetcher.of { key -> api.get(key, ttl = ttl) },
277+
val converter = NotesConverterProvider().provide()
278+
val validator = NotesValidator()
279+
val updater = NotesUpdaterProvider(api).provide()
280+
281+
val store = MutableStoreBuilder.from<NotesKey, NetworkNote, InputNote, OutputNote>(
282+
fetcher = Fetcher.ofFlow { key ->
283+
val network = api.get(key, ttl = ttl)
284+
flow { emit(network) }
285+
},
286+
sourceOfTruth = SourceOfTruth.of(
287+
nonFlowReader = { key -> notes.get(key) },
288+
writer = { key, sot -> notes.put(key, sot) },
289+
delete = { key -> notes.clear(key) },
290+
deleteAll = { notes.clear() }
291+
),
292+
converter
278293
)
279-
.cachePolicy(MemoryPolicy.builder<NotesKey, NetworkNote>().setExpireAfterWrite(10.minutes).build())
280-
.build().asMutableStore<NotesKey, NetworkNote, NetworkNote, NetworkNote, NetworkNote>(
294+
.validator(validator)
295+
.build(
296+
updater = updater,
297+
bookkeeper = null
298+
)
299+
300+
testCollectResponseAfterWriting(store, ttl)
301+
}
302+
303+
@Test
304+
fun collectResponseAfterWritingWithoutSourceOfTruth() {
305+
val ttl = inHours(1)
306+
307+
val store = StoreBuilder.from<NotesKey, OutputNote>(
308+
fetcher = Fetcher.of { key -> OutputNote(api.get(key, ttl = ttl).data, ttl = ttl) },
309+
)
310+
.cachePolicy(MemoryPolicy.builder<NotesKey, OutputNote>().setExpireAfterWrite(10.minutes).build())
311+
.build().asMutableStore<NotesKey, OutputNote, OutputNote, OutputNote, OutputNote>(
281312
Updater.by(
282313
{ _, v -> UpdaterResult.Success.Typed(v) },
283314
),
284315
null,
285316
)
286317

318+
testCollectResponseAfterWriting(store, ttl)
319+
}
320+
321+
private fun testCollectResponseAfterWriting(
322+
store: MutableStore<NotesKey, OutputNote>,
323+
ttl: Long,
324+
) = testScope.runTest {
287325
val readRequest = StoreReadRequest.fresh(NotesKey.Single(Notes.One.id))
288326

289327
store.stream<NotesWriteResponse>(readRequest).test {
290328
assertEquals(StoreReadResponse.Loading(origin = StoreReadResponseOrigin.Fetcher()), awaitItem())
291329
assertEquals(
292330
StoreReadResponse.Data(
293-
NetworkNote(NoteData.Single(Notes.One), ttl = ttl),
331+
OutputNote(NoteData.Single(Notes.One), ttl = ttl),
294332
StoreReadResponseOrigin.Fetcher()
295333
),
296334
awaitItem()
297335
)
298336

299337
val newNote = Notes.One.copy(title = "New Title-1")
300-
val writeRequest = StoreWriteRequest.of<NotesKey, NetworkNote, NotesWriteResponse>(
338+
val writeRequest = StoreWriteRequest.of<NotesKey, OutputNote, NotesWriteResponse>(
301339
key = NotesKey.Single(Notes.One.id),
302-
value = NetworkNote(NoteData.Single(newNote), 0)
340+
value = OutputNote(NoteData.Single(newNote), 0)
303341
)
304342

305343
val storeWriteResponse = store.write(writeRequest)
306344

307-
// Write is success
308-
assertEquals(
309-
StoreWriteResponse.Success.Typed(
310-
NetworkNote(NoteData.Single(newNote), 0)
311-
),
312-
storeWriteResponse
313-
)
345+
assertTrue(storeWriteResponse is StoreWriteResponse.Success)
314346

315347
// New data added by 'write' is collected
316348

317349
assertEquals(
318-
NetworkNote(NoteData.Single(newNote), 0),
350+
OutputNote(NoteData.Single(newNote), 0),
319351
awaitItem().requireData()
320352
)
321353

322354
// different key, not collected
323-
store.write(StoreWriteRequest.of<NotesKey, NetworkNote, NotesWriteResponse>(
355+
store.write(StoreWriteRequest.of<NotesKey, OutputNote, NotesWriteResponse>(
324356
key = NotesKey.Single(Notes.Five.id),
325-
value = NetworkNote(NoteData.Single(newNote), 0)
357+
value = OutputNote(NoteData.Single(newNote), 0)
326358
))
327359
}
328360

@@ -331,7 +363,7 @@ class UpdaterTests {
331363
val cachedStream = store.stream<NotesWriteResponse>(cachedReadRequest)
332364

333365
assertEquals(
334-
NetworkNote(NoteData.Single(Notes.One.copy(title = "New Title-1")), 0),
366+
OutputNote(NoteData.Single(Notes.One.copy(title = "New Title-1")), 0),
335367
cachedStream.first().requireData()
336368
)
337369
}

0 commit comments

Comments
 (0)