Skip to content

Commit

Permalink
improvements in FSM
Browse files Browse the repository at this point in the history
  • Loading branch information
InsanusMokrassar committed Jan 17, 2025
1 parent fd41bf0 commit ce717a4
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 23 deletions.
1 change: 1 addition & 0 deletions fsm/common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ kotlin {
dependencies {
api project(":micro_utils.common")
api project(":micro_utils.coroutines")
api libs.kslog
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package dev.inmo.micro_utils.fsm.common

import dev.inmo.kslog.common.TagLogger
import dev.inmo.kslog.common.e
import dev.inmo.micro_utils.common.Optional
import dev.inmo.micro_utils.coroutines.*
import dev.inmo.micro_utils.fsm.common.utils.StateHandlingErrorHandler
Expand Down Expand Up @@ -68,6 +70,7 @@ open class DefaultStatesMachine <T: State>(
protected val handlers: List<CheckableHandlerHolder<in T, T>>,
protected val onStateHandlingErrorHandler: StateHandlingErrorHandler<T> = defaultStateHandlingErrorHandler()
) : StatesMachine<T> {
protected val logger = TagLogger(this::class.simpleName!!)
/**
* Will call [launchStateHandling] for state handling
*/
Expand Down Expand Up @@ -96,7 +99,13 @@ open class DefaultStatesMachine <T: State>(
statesJobsMutex.withLock {
statesJobs[actualState] ?.cancel()
statesJobs[actualState] = scope.launch {
performUpdate(actualState)
runCatching {
performUpdate(actualState)
}.onFailure {
logger.e(it) {
"Unable to perform update of state from $actualState"
}
}.getOrThrow()
}.also { job ->
job.invokeOnCompletion { _ ->
scope.launch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ interface StatesManager<T : State> {


/**
* Must set current set using [State.context]
* It is expected, that [new] state will be saved in manager.
*
* If [new] context will not be equal to [old] one, it must do some check of availability for replacement
* of potentially exists state on [new] context. If this state can't be replaced, it will throw [IllegalStateException]
*
* @throws IllegalStateException - in case when [new] [State] can't be set
*/
suspend fun update(old: T, new: T)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package dev.inmo.micro_utils.fsm.common

import dev.inmo.kslog.common.e
import dev.inmo.micro_utils.common.*
import dev.inmo.micro_utils.fsm.common.utils.StateHandlingErrorHandler
import dev.inmo.micro_utils.fsm.common.utils.defaultStateHandlingErrorHandler
Expand Down Expand Up @@ -44,7 +45,13 @@ open class DefaultUpdatableStatesMachine<T : State>(
val job = previousState.mapOnPresented {
statesJobs.remove(it)
} ?.takeIf { it.isActive } ?: scope.launch {
performUpdate(actualState)
runCatching {
performUpdate(actualState)
}.onFailure {
logger.e(it) {
"Unable to perform update of state up to $actualState"
}
}.getOrThrow()
}.also { job ->
job.invokeOnCompletion { _ ->
scope.launch {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package dev.inmo.micro_utils.fsm.common.managers

import dev.inmo.micro_utils.coroutines.SmartRWLocker
import dev.inmo.micro_utils.coroutines.withReadAcquire
import dev.inmo.micro_utils.coroutines.withWriteLock
import dev.inmo.micro_utils.fsm.common.State
import dev.inmo.micro_utils.fsm.common.StatesManager
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

/**
* Implement this repo if you want to use some custom repo for [DefaultStatesManager]
Expand All @@ -19,6 +20,14 @@ interface DefaultStatesManagerRepo<T : State> {
* NOT be removed
*/
suspend fun removeState(state: T)

/**
* Semantically, calls [removeState] and then [set]
*/
suspend fun removeAndSet(toRemove: T, toSet: T) {
removeState(toRemove)
set(toSet)
}
/**
* @return Current list of available and saved states
*/
Expand Down Expand Up @@ -58,7 +67,7 @@ open class DefaultStatesManager<T : State>(
protected val _onEndChain = MutableSharedFlow<T>(0)
override val onEndChain: Flow<T> = _onEndChain.asSharedFlow()

protected val mapMutex = Mutex()
protected val internalLocker = SmartRWLocker()

constructor(
repo: DefaultStatesManagerRepo<T>,
Expand All @@ -68,28 +77,30 @@ open class DefaultStatesManager<T : State>(
onUpdateContextsConflictResolver = onContextsConflictResolver
)

override suspend fun update(old: T, new: T) = mapMutex.withLock {
override suspend fun update(old: T, new: T) = internalLocker.withWriteLock {
val stateByOldContext: T? = repo.getContextState(old.context)
when {
stateByOldContext != old -> return@withLock
stateByOldContext == null || old.context == new.context -> {
repo.removeState(old)
repo.set(new)
stateByOldContext != old -> return@withWriteLock
old.context == new.context -> {
repo.removeAndSet(old, new)
_onChainStateUpdated.emit(old to new)
}
else -> {
old.context != new.context -> {
val stateOnNewOneContext = repo.getContextState(new.context)
if (stateOnNewOneContext == null || onUpdateContextsConflictResolver(old, new, stateOnNewOneContext)) {
stateOnNewOneContext ?.let { endChainWithoutLock(it) }
repo.removeState(old)
repo.set(new)
repo.removeAndSet(old, new)
_onChainStateUpdated.emit(old to new)
} else {
error(
"Unable to update state from $old to $new due to false answer from $onUpdateContextsConflictResolver and state on old context $stateOnNewOneContext"
)
}
}
}
}

override suspend fun startChain(state: T) = mapMutex.withLock {
override suspend fun startChain(state: T) = internalLocker.withWriteLock {
val stateOnContext = repo.getContextState(state.context)
if (stateOnContext == null || onStartContextsConflictResolver(stateOnContext, state)) {
stateOnContext ?.let {
Expand All @@ -108,11 +119,13 @@ open class DefaultStatesManager<T : State>(
}

override suspend fun endChain(state: T) {
mapMutex.withLock {
internalLocker.withWriteLock {
endChainWithoutLock(state)
}
}

override suspend fun getActiveStates(): List<T> = repo.getStates()
override suspend fun getActiveStates(): List<T> = internalLocker.withReadAcquire {
repo.getStates()
}

}
Original file line number Diff line number Diff line change
@@ -1,25 +1,59 @@
package dev.inmo.micro_utils.fsm.repos.common

import dev.inmo.kslog.common.TagLogger
import dev.inmo.kslog.common.i
import dev.inmo.micro_utils.coroutines.SmartRWLocker
import dev.inmo.micro_utils.coroutines.withReadAcquire
import dev.inmo.micro_utils.coroutines.withWriteLock
import dev.inmo.micro_utils.fsm.common.State
import dev.inmo.micro_utils.fsm.common.managers.DefaultStatesManagerRepo
import dev.inmo.micro_utils.repos.*
import dev.inmo.micro_utils.repos.pagination.getAll
import dev.inmo.micro_utils.repos.unset

class KeyValueBasedDefaultStatesManagerRepo<T : State>(
private val keyValueRepo: KeyValueRepo<Any, T>
) : DefaultStatesManagerRepo<T> {
private val locker = SmartRWLocker()
private val logger = TagLogger("KeyValueBasedDefaultStatesManagerRepo")
override suspend fun set(state: T) {
keyValueRepo.set(state.context, state)
locker.withWriteLock {
keyValueRepo.set(state.context, state)
logger.i { "Set ${state.context} value to $state" }
}
}

override suspend fun removeState(state: T) {
if (keyValueRepo.get(state.context) == state) {
keyValueRepo.unset(state.context)
locker.withWriteLock {
if (keyValueRepo.get(state.context) == state) {
keyValueRepo.unset(state.context)
logger.i { "Unset $state" }
}
}
}

override suspend fun getStates(): List<T> = keyValueRepo.getAll { keys(it) }.map { it.second }
override suspend fun getContextState(context: Any): T? = keyValueRepo.get(context)
override suspend fun removeAndSet(toRemove: T, toSet: T) {
locker.withWriteLock {
when {
toRemove.context == toSet.context -> {
keyValueRepo.set(toSet.context, toSet)
}
else -> {
keyValueRepo.set(toSet.context, toSet)
keyValueRepo.unset(toRemove)
}
}
}
}

override suspend fun contains(context: Any): Boolean = keyValueRepo.contains(context)
override suspend fun getStates(): List<T> = locker.withReadAcquire {
keyValueRepo.getAll { keys(it) }.map { it.second }
}
override suspend fun getContextState(context: Any): T? = locker.withReadAcquire {
keyValueRepo.get(context)
}

override suspend fun contains(context: Any): Boolean = locker.withReadAcquire {
keyValueRepo.contains(context)
}
}

0 comments on commit ce717a4

Please sign in to comment.