Skip to content

Commit

Permalink
Avoid deadlock in RealMutableStore (#658)
Browse files Browse the repository at this point in the history
* Add test case

Signed-off-by: Amr Yousef <[email protected]>

* Always Release storeLock

Signed-off-by: Amr Yousef <[email protected]>

* Update kermit to 2.0.4 (#655)

Fixes #653 and #654

Signed-off-by: Scott Olcott <[email protected]>
Signed-off-by: Amr Yousef <[email protected]>

* Revert "Update kermit to 2.0.4 (#655)"

This reverts commit 76f34d4.

Signed-off-by: Amr Yousef <[email protected]>

---------

Signed-off-by: Amr Yousef <[email protected]>
Signed-off-by: Scott Olcott <[email protected]>
Co-authored-by: Scott Olcott <[email protected]>
  • Loading branch information
amrfarid140 and solcott authored Oct 4, 2024
1 parent 22bb733 commit 067bd41
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,13 @@ internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local :
block: suspend ThreadSafety.() -> Output,
): Output {
storeLock.lock()
val threadSafety = requireNotNull(keyToThreadSafety[key])
val output = threadSafety.block()
storeLock.unlock()
return output
try {
val threadSafety = requireNotNull(keyToThreadSafety[key])
val output = threadSafety.block()
return output
} finally {
storeLock.unlock()
}
}

private suspend fun conflictsMightExist(key: Key): Boolean {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
package org.mobilenativefoundation.store.store5

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
import org.mobilenativefoundation.store.store5.impl.extensions.get
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertIs
import kotlin.test.assertNotNull
import kotlin.time.Duration.Companion.hours

@FlowPreview
Expand Down Expand Up @@ -39,4 +48,87 @@ class StoreWithInMemoryCacheTests {
assertEquals("result", c)
assertEquals("result", d)
}

@Test
fun storeDeadlock() =
testScope.runTest {
repeat(1000) {
val store =
StoreBuilder
.from(
fetcher = Fetcher.of { key: Int -> "fetcher_${key}" },
sourceOfTruth = SourceOfTruth.Companion.of(
reader = { key ->
flow<String> {
emit("source_of_truth_${key}")
}
},
writer = { key: Int, local: String ->

}
)
)
.disableCache()
.toMutableStoreBuilder(
converter = object : Converter<String, String, String> {
override fun fromNetworkToLocal(network: String): String {
return network
}

override fun fromOutputToLocal(output: String): String {
return output
}
},
)
.build(
updater = object : Updater<Int, String, Unit> {
var callCount = -1
override suspend fun post(key: Int, value: String): UpdaterResult {
callCount += 1
if (callCount % 2 == 0) {
throw IllegalArgumentException(key.toString() + "value:$value")
} else {
return UpdaterResult.Success.Untyped("")
}
}

override val onCompletion: OnUpdaterCompletion<Unit>?
get() = null

}
)

val jobs = mutableListOf<Job>()
jobs.add(
store.stream<Nothing>(StoreReadRequest.cached(1, refresh = true))
.mapNotNull { it.dataOrNull() }
.launchIn(CoroutineScope(Dispatchers.Default))
)
val job1 = store.stream<Nothing>(StoreReadRequest.cached(0, refresh = true))
.mapNotNull { it.dataOrNull() }
.launchIn(CoroutineScope(Dispatchers.Default))
jobs.add(
store.stream<Nothing>(StoreReadRequest.cached(2, refresh = true))
.mapNotNull { it.dataOrNull() }
.launchIn(CoroutineScope(Dispatchers.Default)))
jobs.add(
store.stream<Nothing>(StoreReadRequest.cached(3, refresh = true))
.mapNotNull { it.dataOrNull() }
.launchIn(CoroutineScope(Dispatchers.Default)))
job1.cancel()
assertEquals(
expected = "source_of_truth_0",
actual = store.stream<Nothing>(StoreReadRequest.cached(0, refresh = true))
.mapNotNull { it.dataOrNull() }.first()
)
jobs.forEach {
it.cancel()
assertEquals(
expected = "source_of_truth_0",
actual = store.stream<Nothing>(StoreReadRequest.cached(0, refresh = true))
.mapNotNull { it.dataOrNull() }.first()
)
}
}
}
}

0 comments on commit 067bd41

Please sign in to comment.