#714 Release all locks when Pramen job exits abnormally.#715
#714 Release all locks when Pramen job exits abnormally.#715
Conversation
WalkthroughThe changes introduce a token lock registry system to centrally manage token locks across the pipeline lifecycle. A new abstract Changes
Sequence DiagramsequenceDiagram
participant Job as Job / Pipeline
participant Lock as Token Lock Instance
participant Registry as TokenLockRegistry
participant State as PipelineStateImpl
Job->>Lock: tryAcquire()
activate Lock
Lock->>Lock: compute isAcquired flag
alt Acquisition Successful
Lock->>Registry: registerLock(this)
activate Registry
Registry->>Registry: add to active locks list
deactivate Registry
Lock->>Job: return true
else Acquisition Failed
Lock->>Job: return false
end
deactivate Lock
Job->>Lock: release()
activate Lock
Lock->>Lock: release lock resource
Lock->>Registry: unregisterLock(this)
activate Registry
Registry->>Registry: remove from active locks list
deactivate Registry
deactivate Lock
Job->>Job: System.exit() / job finish
Job->>State: onAppFinish()
activate State
State->>Registry: releaseAllLocks()
activate Registry
Registry->>Registry: snapshot active locks
loop For each lock
Registry->>Lock: release()
activate Lock
Lock->>Lock: release resource
deactivate Lock
end
Registry->>Registry: clear registry
deactivate Registry
State->>State: send notifications & emails
deactivate State
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockBase.scala (1)
100-106:⚠️ Potential issue | 🟠 MajorEnsure registry unregistration and hook removal run even when guard release fails.
If
releaseGuardLock()throws,TokenLockRegistry.unregisterLock(this)andJvmUtils.safeRemoveShutdownHook(shutdownHook)are skipped, leaving stale lifecycle state.🛠️ Proposed fix
if (wasAcquired) { watcherThreadOpt.foreach(_.interrupt()) watcherThreadOpt = None - releaseGuardLock() - JvmUtils.safeRemoveShutdownHook(shutdownHook) - TokenLockRegistry.unregisterLock(this) + try { + releaseGuardLock() + } finally { + JvmUtils.safeRemoveShutdownHook(shutdownHook) + TokenLockRegistry.unregisterLock(this) + } log.info(s"Lock released: '$escapedToken'.") }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockBase.scala` around lines 100 - 106, The existing release path may skip JvmUtils.safeRemoveShutdownHook(shutdownHook) and TokenLockRegistry.unregisterLock(this) if releaseGuardLock() throws; update the code to ensure shutdownHook removal and registry unregistration always run by wrapping releaseGuardLock() in a try/finally (or equivalent) so that after attempting releaseGuardLock() you always call JvmUtils.safeRemoveShutdownHook(shutdownHook) and TokenLockRegistry.unregisterLock(this); preserve the existing watcherThreadOpt interruption and watcherThreadOpt = None/wasAcquired handling but move the removal/unregister calls into the finally block to guarantee cleanup even on exceptions.
🧹 Nitpick comments (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala (1)
202-214: Make lock cleanup unconditional inonAppFinish().
TokenLockRegistry.releaseAllLocks()is currently the last statement. Wrapping preceding steps intry/finallykeeps lock release guaranteed even if new failure paths are introduced later.♻️ Proposed refactor
private[state] def onAppFinish(): Unit = { if (!exitedNormally && failureException.isEmpty && signalException.isDefined) { failureException = signalException exitCode |= EXIT_CODE_SIGNAL_RECEIVED } finishedInstant = Option(Instant.now()) - sendPipelineNotifications() - runCustomShutdownHook() - removeSignalHandlers() - sendNotificationEmail() - TokenLockRegistry.releaseAllLocks() + try { + sendPipelineNotifications() + runCustomShutdownHook() + removeSignalHandlers() + sendNotificationEmail() + } finally { + TokenLockRegistry.releaseAllLocks() + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala` around lines 202 - 214, The lock cleanup call TokenLockRegistry.releaseAllLocks() must run unconditionally from onAppFinish(); wrap the existing sequence (the conditional setting of failureException/exitCode, finishedInstant assignment, sendPipelineNotifications(), runCustomShutdownHook(), removeSignalHandlers(), sendNotificationEmail()) in a try block and move TokenLockRegistry.releaseAllLocks() into the finally block so locks are always released even if any of those methods throw; preserve existing variable updates (exitedNormally, failureException, exitCode, finishedInstant) and ensure exceptions continue to propagate after locks are released.pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala (1)
102-108: Strengthen this test to verify external lock release, not only internal state.Checking
isAcquiredconfirms local state, but re-acquiring the same token validates that the backing lock was actually released.🧪 Proposed test improvement
"lock registry releases all locks" in { val lock1 = getLock("token1") assert(lock1.tryAcquire()) TokenLockRegistry.releaseAllLocks() assert(!lock1.asInstanceOf[TokenLockBase].isAcquired) + + val lock2 = getLock("token1") + assert(lock2.tryAcquire()) + lock2.release() }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala` around lines 102 - 108, The test currently only checks internal state via TokenLockBase.isAcquired after calling TokenLockRegistry.releaseAllLocks(); instead, after releasing, attempt to actually re-acquire the backing lock for the same token to ensure external release: call TokenLockRegistry.releaseAllLocks(), then obtain a new lock for the same token using getLock("token1") (or reuse lock1) and assert that tryAcquire() returns true (and release it afterwards). Reference getLock, TokenLockRegistry.releaseAllLocks, TokenLockBase.isAcquired and tryAcquire to locate where to add the re-acquire assertion.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockRegistry.scala`:
- Around line 55-60: The catch block in TokenLockRegistry where
currentListCopy.foreach calls l.release() swallows the exception; update the
NonFatal(ex) handler to include the exception when logging so failures to
release locks are visible—e.g., change the log.warn call in the catch for
NonFatal(ex) to pass the exception (or its message) along with the descriptive
text that includes l.token, ensuring the warning contains the exception/stack
trace for diagnostics.
---
Outside diff comments:
In `@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockBase.scala`:
- Around line 100-106: The existing release path may skip
JvmUtils.safeRemoveShutdownHook(shutdownHook) and
TokenLockRegistry.unregisterLock(this) if releaseGuardLock() throws; update the
code to ensure shutdownHook removal and registry unregistration always run by
wrapping releaseGuardLock() in a try/finally (or equivalent) so that after
attempting releaseGuardLock() you always call
JvmUtils.safeRemoveShutdownHook(shutdownHook) and
TokenLockRegistry.unregisterLock(this); preserve the existing watcherThreadOpt
interruption and watcherThreadOpt = None/wasAcquired handling but move the
removal/unregister calls into the finally block to guarantee cleanup even on
exceptions.
---
Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala`:
- Around line 202-214: The lock cleanup call TokenLockRegistry.releaseAllLocks()
must run unconditionally from onAppFinish(); wrap the existing sequence (the
conditional setting of failureException/exitCode, finishedInstant assignment,
sendPipelineNotifications(), runCustomShutdownHook(), removeSignalHandlers(),
sendNotificationEmail()) in a try block and move
TokenLockRegistry.releaseAllLocks() into the finally block so locks are always
released even if any of those methods throw; preserve existing variable updates
(exitedNormally, failureException, exitCode, finishedInstant) and ensure
exceptions continue to propagate after locks are released.
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala`:
- Around line 102-108: The test currently only checks internal state via
TokenLockBase.isAcquired after calling TokenLockRegistry.releaseAllLocks();
instead, after releasing, attempt to actually re-acquire the backing lock for
the same token to ensure external release: call
TokenLockRegistry.releaseAllLocks(), then obtain a new lock for the same token
using getLock("token1") (or reuse lock1) and assert that tryAcquire() returns
true (and release it afterwards). Reference getLock,
TokenLockRegistry.releaseAllLocks, TokenLockBase.isAcquired and tryAcquire to
locate where to add the re-acquire assertion.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
pramen/api/src/main/scala/za/co/absa/pramen/api/lock/TokenLock.scalapramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockAllow.scalapramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockBase.scalapramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryAllow.scalapramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockRegistry.scalapramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scalapramen/core/src/test/scala/za/co/absa/pramen/core/mocks/lock/TokenLockMock.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockJdbcSuite.scala
| currentListCopy.foreach { l => | ||
| try { | ||
| l.release() | ||
| } catch { | ||
| case NonFatal(ex) => log.warn(s"Unable to release the lock: ${l.token}") | ||
| } |
There was a problem hiding this comment.
Include the exception in warning logs for failed lock releases.
The catch block captures ex but currently drops stack/context, which makes unlock failures hard to diagnose.
📝 Proposed fix
currentListCopy.foreach { l =>
try {
l.release()
} catch {
- case NonFatal(ex) => log.warn(s"Unable to release the lock: ${l.token}")
+ case NonFatal(ex) => log.warn(s"Unable to release the lock: ${l.token}", ex)
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| currentListCopy.foreach { l => | |
| try { | |
| l.release() | |
| } catch { | |
| case NonFatal(ex) => log.warn(s"Unable to release the lock: ${l.token}") | |
| } | |
| currentListCopy.foreach { l => | |
| try { | |
| l.release() | |
| } catch { | |
| case NonFatal(ex) => log.warn(s"Unable to release the lock: ${l.token}", ex) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockRegistry.scala`
around lines 55 - 60, The catch block in TokenLockRegistry where
currentListCopy.foreach calls l.release() swallows the exception; update the
NonFatal(ex) handler to include the exception when logging so failures to
release locks are visible—e.g., change the log.warn call in the catch for
NonFatal(ex) to pass the exception (or its message) along with the descriptive
text that includes l.token, ensuring the warning contains the exception/stack
trace for diagnostics.
Unit Test Coverage
Files
|
Closes #714
Summary by CodeRabbit
New Features
Refactor