Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

santio/fix/multi connection pool #20

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ kotlinx-coroutines = "1.9.0-RC"
gson = "2.11.0"
slf4j = "2.0.13"
sqlite = "3.46.0.0"
log4j = "2.23.0"
evo = "1.3"
logback = "1.5.8"

[plugins]
kotlin = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" }
Expand All @@ -22,14 +22,12 @@ evo = { module = "org.atteo:evo-inflector", version.ref = "evo" }

test-kotlinx-coroutines = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-test", version.ref = "kotlinx-coroutines" }
test-sqlite = { module = "org.xerial:sqlite-jdbc", version.ref = "sqlite" }
test-log4j = { module = "org.apache.logging.log4j:log4j-core", version.ref = "log4j" }
test-log4j-impl = { module = "org.apache.logging.log4j:log4j-slf4j2-impl", version.ref = "log4j" }
test-logback = { module = "ch.qos.logback:logback-classic", version.ref = "logback" }

[bundles]
testing = [
"test-kotlinx-coroutines",
"kotlinx-serialization",
"test-sqlite",
"test-log4j",
"test-log4j-impl",
"test-logback"
]
26 changes: 18 additions & 8 deletions src/main/kotlin/gg/ingot/iron/Iron.kt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class Iron internal constructor(
/** The default executor to use if one isn't specified */
private val executor = CoroutineIronExecutor(this)

/** Whether Iron is currently closed. */
val isClosed: Boolean
get() = pool == null

/**
* Establishes a connection to the database using the provided connection string.
*
Expand All @@ -74,10 +78,10 @@ class Iron internal constructor(

pool = if(settings.isMultiConnectionPool) {
logger.trace("Using multi connection pool.")
MultiConnectionPool(connectionString, settings)
MultiConnectionPool(connectionString, this)
} else {
logger.trace("Using single connection pool.")
SingleConnectionPool(connectionString, settings)
SingleConnectionPool(connectionString, this)
}

return this
Expand Down Expand Up @@ -117,8 +121,10 @@ class Iron internal constructor(
val connection = pool?.connection()
?: error("Connection is not open, call connect() before using the connection.")

return block(connection)
.also { pool?.release(connection) }
val response = block(connection)
pool?.release(connection)

return response
}

/**
Expand All @@ -130,16 +136,20 @@ class Iron internal constructor(
val connection = pool?.connection()
?: error("Connection is not open, call connect() before using the connection.")

return block(connection)
.also { pool?.release(connection) }
val response = block(connection)
pool?.release(connection)

return response
}

/**
* Closes the connection to the database.
* @since 1.0
*/
fun close() {
pool?.close()
@JvmOverloads
fun close(force: Boolean = false) {
pool?.close(force)
pool = null
}

/**
Expand Down
9 changes: 8 additions & 1 deletion src/main/kotlin/gg/ingot/iron/IronSettings.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,23 @@ import java.util.*
import kotlin.reflect.KClass
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlin.time.toKotlinDuration

/**
* Settings for the Iron connection pool.
* @author DebitCardz
* @since 1.2
*/

data class IronSettings internal constructor(
/** Minimum active connections in the pool. */
var minimumActiveConnections: Int = 1,
/** Maximum connections allowed to be generated by a pool. */
var maximumConnections: Int = minimumActiveConnections,
/** The timeout for connection polling. */
var connectionPollTimeout: Duration = 30.seconds,
/** How long connections should last before being closed if above the minimum */
var connectionTTL: Duration = 10.seconds,
/** The driver to use for the connection pool. */
var driver: DatabaseDriver? = null,
/** The serialization adapter to use for models. */
Expand Down Expand Up @@ -146,6 +150,7 @@ data class IronSettings internal constructor(
private var minimumActiveConnections: Int = 1
private var maximumConnections: Int = minimumActiveConnections
private var connectionPollTimeout: Duration = 30.seconds
private var connectionTTL: Duration = 10.seconds
private var driver: DatabaseDriver? = null
private var serialization: SerializationAdapter? = null
private var driverProperties: Properties? = null
Expand All @@ -154,7 +159,8 @@ data class IronSettings internal constructor(

fun minimumActiveConnections(minimumActiveConnections: Int) = apply { this.minimumActiveConnections = minimumActiveConnections }
fun maximumConnections(maximumConnections: Int) = apply { this.maximumConnections = maximumConnections }
fun connectionPollTimeout(connectionPollTimeout: Duration) = apply { this.connectionPollTimeout = connectionPollTimeout }
fun connectionPollTimeout(connectionPollTimeout: java.time.Duration) = apply { this.connectionPollTimeout = connectionPollTimeout.toKotlinDuration() }
fun connectionTTL(ttl: java.time.Duration) = apply { this.connectionTTL = ttl.toKotlinDuration() }
fun driver(driver: DatabaseDriver) = apply { this.driver = driver }
fun serialization(serialization: SerializationAdapter) = apply { this.serialization = serialization }
fun driverProperties(driverProperties: Properties) = apply { this.driverProperties = driverProperties }
Expand All @@ -166,6 +172,7 @@ data class IronSettings internal constructor(
minimumActiveConnections,
maximumConnections,
connectionPollTimeout,
connectionTTL,
driver,
serialization,
driverProperties,
Expand Down
3 changes: 2 additions & 1 deletion src/main/kotlin/gg/ingot/iron/pool/ConnectionPool.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ interface ConnectionPool {

/**
* Closes the connection pool.
* @param force If the pool should be closed immediately, or wait for all connections to be released.
*/
fun close()
fun close(force: Boolean)

/**
* Uses a connection from the pool and releases it after the block is executed.
Expand Down
128 changes: 65 additions & 63 deletions src/main/kotlin/gg/ingot/iron/pool/MultiConnectionPool.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package gg.ingot.iron.pool

import gg.ingot.iron.IronSettings
import gg.ingot.iron.Iron
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
Expand All @@ -11,96 +11,60 @@ import java.sql.DriverManager
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.time.Duration.Companion.seconds

/**
* A connection pool that manages multiple connections to a database.
*
* This pool will attempt to keep a minimum amount of connections open at all times, and will create new connections
* as needed up to a maximum amount of connections, any connections past the minimum will be available to use
* up until it hits its TTL or is closed.
*
* @author DebitCardz
* @since 1.2
*/
class MultiConnectionPool(
private val connectionString: String,
private val settings: IronSettings,
private val iron: Iron,
) : ConnectionPool {
private val logger = LoggerFactory.getLogger(MultiConnectionPool::class.java)

private val coroutineScope = CoroutineScope(settings.dispatcher + SupervisorJob())
private val coroutineScope = CoroutineScope(iron.settings.dispatcher + SupervisorJob())

/** The pool of connections. */
private val pool = ArrayBlockingQueue<Connection>(settings.minimumActiveConnections)
/** The pool of persistent connections. */
private val pool = ArrayBlockingQueue<Connection>(iron.settings.minimumActiveConnections)

/** The amount of open connections in the pool. */
/** The amount of open connections */
private val openConnections = AtomicInteger(0)

init {
logger.trace(
"Creating multi connection pool with minimum connections of {} and maximum connections of {}.",
settings.minimumActiveConnections,
settings.maximumConnections
iron.settings.minimumActiveConnections,
iron.settings.maximumConnections
)

// always keep a minimum amount of connections
repeat(settings.minimumActiveConnections) {
// Create persistent connections
repeat(iron.settings.minimumActiveConnections) {
pool.add(createConnection())
}
openConnections.set(settings.minimumActiveConnections)
}

override fun connection(): Connection {
val timeoutMs = settings.connectionPollTimeout
.inWholeMilliseconds

// if we're at the complete maximum amt of connections
// we have to wait for one to be released to us
if(openConnections.get() >= settings.maximumConnections) {
val acquiredConnection = pool.poll(timeoutMs, TimeUnit.MILLISECONDS)
if(acquiredConnection != null) {
openConnections.incrementAndGet()
}

if(acquiredConnection?.isClosed == true) {
openConnections.decrementAndGet()
logger.warn("Acquired a closed connection, attempting recovery.")
return connection()
}

return acquiredConnection ?: error("Failed to get a connection from the pool.")
}
var connection = pull() ?: error("No connection available in the pool, try increasing the maximum connections.")

// if we have an available connection, use it or if we haven't
// hit our max just create one
val acquiredConnection = pool.poll() ?: run {
// incr open conns if we have to establish a new one
openConnections.incrementAndGet()
createConnection()
if(connection.isClosed) {
logger.error("Acquired a closed connection, recreating connection.")
connection = createConnection()
}

if(acquiredConnection.isClosed) {
openConnections.decrementAndGet()
logger.warn("Acquired a closed connection, attempting recovery.")
return connection()
}

return acquiredConnection ?: error("Failed to get a connection from the pool.")
return connection
}

override fun release(connection: Connection) {
// ensure releasing doesn't block the consumer
coroutineScope.launch {
runCatching {
// give some buffer room in case we need to handle bursts
// this'll wait a bit so if connections are taken we have one
// ready to be put back in the pool
val success = pool.offer(
connection,
10.seconds.inWholeMilliseconds,
TimeUnit.MILLISECONDS
)

if (!success) {
connection.close()
openConnections.decrementAndGet()
}
push(connection)
}.onFailure {
logger.error("Failed to release connection back to the pool, {}", it.message)

Expand All @@ -110,15 +74,53 @@ class MultiConnectionPool(
}
}

override fun close() {
coroutineScope.cancel()
pool.forEach(Connection::close)
openConnections.set(0)
override fun close(force: Boolean) {
logger.trace("Closing connection pool, force: {}", force)
if (force) {
coroutineScope.cancel()
pool.forEach(Connection::close)
}
}

private fun push(connection: Connection) {
SantioMC marked this conversation as resolved.
Show resolved Hide resolved
if (iron.isClosed) {
logger.debug("Iron is closed, cannot push connection back to the pool.")
connection.close()
return
}

val offered = pool.offer(connection, iron.settings.connectionPollTimeout.inWholeMilliseconds, TimeUnit.MILLISECONDS)
if (!offered) {
logger.trace("Connection pool is full, closing connection.")
connection.close()
}

logger.trace("Connection task finished, open connections: {}", openConnections.get())
openConnections.decrementAndGet()
DebitCardz marked this conversation as resolved.
Show resolved Hide resolved
}

private fun pull(): Connection? {
SantioMC marked this conversation as resolved.
Show resolved Hide resolved
if (iron.isClosed) {
logger.debug("Iron is closed, cannot pull new connection.")
return null
}

if (openConnections.get() >= iron.settings.maximumConnections) {
logger.trace("Maximum connections reached, waiting for a connection to be released.")
return pool.poll(iron.settings.connectionPollTimeout.inWholeMilliseconds, TimeUnit.MILLISECONDS)
}

val connection = pool.poll() ?: createConnection()
openConnections.incrementAndGet()

logger.trace("Acquired a connection from the pool, open connections: {}", openConnections.get())
DebitCardz marked this conversation as resolved.
Show resolved Hide resolved
return connection
}

private fun createConnection(): Connection {
return if(settings.driverProperties != null) {
DriverManager.getConnection(connectionString, settings.driverProperties)
logger.trace("Creating a new connection for the pool.")
return if(iron.settings.driverProperties != null) {
DriverManager.getConnection(connectionString, iron.settings.driverProperties)
} else {
DriverManager.getConnection(connectionString)
}
Expand Down
10 changes: 5 additions & 5 deletions src/main/kotlin/gg/ingot/iron/pool/SingleConnectionPool.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package gg.ingot.iron.pool

import gg.ingot.iron.IronSettings
import gg.ingot.iron.Iron
import java.sql.Connection
import java.sql.DriverManager

Expand All @@ -11,10 +11,10 @@ import java.sql.DriverManager
*/
class SingleConnectionPool(
connectionString: String,
settings: IronSettings
iron: Iron
) : ConnectionPool {
private val connection = if(settings.driverProperties != null) {
DriverManager.getConnection(connectionString, settings.driverProperties)
private val connection = if(iron.settings.driverProperties != null) {
DriverManager.getConnection(connectionString, iron.settings.driverProperties)
} else {
DriverManager.getConnection(connectionString)
}
Expand All @@ -25,7 +25,7 @@ class SingleConnectionPool(

override fun release(connection: Connection) = Unit

override fun close() {
override fun close(force: Boolean) {
connection.close()
}
}
Loading