Skip to content

Commit

Permalink
resolved deps conflict error
Browse files Browse the repository at this point in the history
  • Loading branch information
vga91 committed Jan 27, 2025
1 parent a02bef1 commit b47b1ae
Show file tree
Hide file tree
Showing 9 changed files with 20 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class KafkaEventRouter(private val config: Map<String, String>,
private val db: GraphDatabaseService,
private val log: Log) {

/*override*/ val eventRouterConfiguration: StreamsEventRouterConfiguration = StreamsEventRouterConfiguration
val eventRouterConfiguration: StreamsEventRouterConfiguration = StreamsEventRouterConfiguration
.from(config, db.databaseName(), db.isDefaultDb(), log)


Expand All @@ -44,21 +44,20 @@ class KafkaEventRouter(private val config: Map<String, String>,
else -> KafkaStatus.STOPPED
}

/*override*/ fun start() = runBlocking {
fun start() = runBlocking {
mutex.withLock(producer) {
if (status(producer) == KafkaStatus.RUNNING) {
return@runBlocking
}
log.info("Initialising Kafka Connector")
kafkaAdminService.start()
val props = kafkaConfig.asProperties()
producer = Neo4jKafkaProducer(props)
producer!!.initTransactions()
log.info("Kafka Connector started")
}
}

/*override*/ fun stop() = runBlocking {
fun stop() = runBlocking {
mutex.withLock(producer) {
if (status(producer) == KafkaStatus.STOPPED) {
return@runBlocking
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package apoc.kafka.common.support

import apoc.kafka.common.utils.Neo4jUtilsTest
import apoc.kafka.utils.KafkaUtil
import org.neo4j.driver.AuthToken
import org.neo4j.driver.AuthTokens
Expand Down Expand Up @@ -108,7 +109,7 @@ class Neo4jContainerExtension(dockerImage: String): Neo4jContainer<Neo4jContaine

fun withKafka(network: Network, bootstrapServers: String): Neo4jContainerExtension {
withNetwork(network)
withNeo4jConfig("apoc.kafka.bootstrap.servers", bootstrapServers)
withNeo4jConfig(Neo4jUtilsTest.KAFKA_BOOTSTRAP_SERVER, bootstrapServers)
return this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ class Neo4jUtilsTest {
companion object {
@ClassRule @JvmField
val db = ImpermanentDbmsRule()

val KAFKA_BOOTSTRAP_SERVER = "apoc.kafka.bootstrap.servers"
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.kernel.api.procedure.GlobalProcedures

import apoc.ExtendedApocConfig.APOC_KAFKA_ENABLED
import apoc.kafka.common.utils.Neo4jUtilsTest
import org.apache.kafka.common.serialization.ByteArraySerializer

open class KafkaEventSinkBaseTSE {
Expand Down Expand Up @@ -75,7 +76,7 @@ open class KafkaEventSinkBaseTSE {

fun createDbWithKafkaConfigs(vararg pairs: Pair<String, Any>) : GraphDatabaseService {
val mutableMapOf = mutableMapOf<String, Any>(
"apoc.kafka.bootstrap.servers" to KafkaEventSinkSuiteIT.kafka.bootstrapServers,
Neo4jUtilsTest.KAFKA_BOOTSTRAP_SERVER to KafkaEventSinkSuiteIT.kafka.bootstrapServers,
APOC_KAFKA_ENABLED to "true",
"bootstrap.servers" to KafkaEventSinkSuiteIT.kafka.bootstrapServers,
"apoc.kafka.sink.enabled" to "true"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package apoc.kafka.producer.integrations

import apoc.ExtendedApocConfig.APOC_KAFKA_ENABLED
import apoc.kafka.events.OperationType
import apoc.kafka.events.StreamsTransactionEvent
import apoc.kafka.common.support.KafkaTestUtils
import apoc.kafka.common.support.KafkaTestUtils.getDbServices
import apoc.kafka.common.utils.Neo4jUtilsTest
import apoc.util.DbmsTestUtil
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.junit.*
Expand Down Expand Up @@ -38,20 +37,6 @@ open class KafkaEventRouterBaseTSE { // TSE (Test Suit Element)
KafkaEventRouterSuiteIT.tearDownContainer()
}
}

// common methods
fun isValidRelationship(event: StreamsTransactionEvent, type: OperationType) = when (type) {
OperationType.created -> event.payload.before == null
&& event.payload.after?.let { it.properties?.let { it.isNullOrEmpty() } } ?: false
&& event.schema.properties == emptyMap<String, String>()
OperationType.updated -> event.payload.before?.let { it.properties?.let { it.isNullOrEmpty() } } ?: false
&& event.payload.after?.let { it.properties == mapOf("type" to "update") } ?: false
&& event.schema.properties == mapOf("type" to "String")
OperationType.deleted -> event.payload.before?.let { it.properties == mapOf("type" to "update") } ?: false
&& event.payload.after == null
&& event.schema.properties == mapOf("type" to "String")
else -> throw IllegalArgumentException("Unsupported OperationType")
}
}

lateinit var kafkaConsumer: KafkaConsumer<String, ByteArray>
Expand All @@ -77,8 +62,7 @@ open class KafkaEventRouterBaseTSE { // TSE (Test Suit Element)
fun createDbWithKafkaConfigs(vararg pairs: Pair<String, Any>) : GraphDatabaseService {
val mutableMapOf = mutableMapOf<String, Any>(
APOC_KAFKA_ENABLED to "true",
"apoc.kafka.bootstrap.servers" to KafkaEventRouterSuiteIT.kafka.bootstrapServers,
"bootstrap.servers" to KafkaEventRouterSuiteIT.kafka.bootstrapServers
Neo4jUtilsTest.KAFKA_BOOTSTRAP_SERVER to KafkaEventRouterSuiteIT.kafka.bootstrapServers
)

mutableMapOf.putAll(mapOf(*pairs))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() {
val db = createDbWithKafkaConfigs()

val topic = UUID.randomUUID().toString()
KafkaEventRouterSuiteIT.registerPublishProcedure(db)
kafkaConsumer.subscribe(listOf(topic))
val message = "Hello World"
db.execute("CALL apoc.kafka.publish('$topic', '$message')")
Expand All @@ -41,7 +40,6 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() {
fun testProcedureWithKey() {
val db = createDbWithKafkaConfigs()
val topic = UUID.randomUUID().toString()
KafkaEventRouterSuiteIT.registerPublishProcedure(db)
kafkaConsumer.subscribe(listOf(topic))
val message = "Hello World"
val keyRecord = "test"
Expand All @@ -58,7 +56,6 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() {
fun testProcedureWithKeyAsMap() {
val db = createDbWithKafkaConfigs()
val topic = UUID.randomUUID().toString()
KafkaEventRouterSuiteIT.registerPublishProcedure(db)
kafkaConsumer.subscribe(listOf(topic))
val message = "Hello World"
val keyRecord = mapOf("one" to "Foo", "two" to "Baz", "three" to "Bar")
Expand All @@ -75,7 +72,6 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() {
val db = createDbWithKafkaConfigs()
// db.start()
val topic = UUID.randomUUID().toString()
KafkaEventRouterSuiteIT.registerPublishProcedure(db)
kafkaConsumer.subscribe(listOf(topic))
val message = "Hello World"
val keyRecord = "test"
Expand All @@ -90,7 +86,6 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() {
val db = createDbWithKafkaConfigs()
// db.start()
val topic = UUID.randomUUID().toString()
KafkaEventRouterSuiteIT.registerPublishProcedure(db)
kafkaConsumer.subscribe(listOf(topic))
val message = "Hello World"
val keyRecord = "test"
Expand Down Expand Up @@ -209,7 +204,6 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() {
it.createTopics(listOf(NewTopic(topic, 5, 1)))
.all()
.get()
KafkaEventRouterSuiteIT.registerPublishProcedure(db)
kafkaConsumer.subscribe(listOf(topic))

val message = "Hello World"
Expand Down Expand Up @@ -239,7 +233,6 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() {
it.createTopics(listOf(NewTopic(topic, 3, 1)))
.all()
.get()
KafkaEventRouterSuiteIT.registerPublishProcedure(db)
kafkaConsumer.subscribe(listOf(topic))

val message = "Hello World"
Expand Down Expand Up @@ -267,7 +260,6 @@ class KafkaEventRouterProcedureTSE : KafkaEventRouterBaseTSE() {
it.createTopics(listOf(NewTopic(topic, 3, 1)))
.all()
.get()
KafkaEventRouterSuiteIT.registerPublishProcedure(db)
kafkaConsumer.subscribe(listOf(topic))

val message = "Hello World"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ class KafkaEventRouterSuiteIT {
kafka.stop()
}, UninitializedPropertyAccessException::class.java)
}

fun registerPublishProcedure(db: GraphDatabaseService) {
// (db as GraphDatabaseAPI).dependencyResolver.resolveDependency(GlobalProcedures::class.java)
// .registerProcedure(PublishProcedures::class.java)
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package apoc.kafka.producer.kafka

import apoc.kafka.common.utils.Neo4jUtilsTest
import org.junit.Test
import kotlin.test.assertEquals
import kotlin.test.assertFalse
Expand All @@ -9,7 +10,8 @@ class KafkaConfigurationTest {

@Test
fun shouldCreateConfiguration() {
val map = mapOf("apoc.kafka.bootstrap.servers" to "kafka:5678",
val map = mapOf(
Neo4jUtilsTest.KAFKA_BOOTSTRAP_SERVER to "kafka:5678",
"apoc.kafka.acks" to "10",
"apoc.kafka.retries" to 1,
"apoc.kafka.batch.size" to 10,
Expand All @@ -32,7 +34,7 @@ class KafkaConfigurationTest {

val properties = kafkaConfig.asProperties()

assertEquals(map["apoc.kafka.bootstrap.servers"], properties["bootstrap.servers"])
assertEquals(map[Neo4jUtilsTest.KAFKA_BOOTSTRAP_SERVER], properties["bootstrap.servers"])
assertEquals(map["apoc.kafka.acks"], properties["acks"])
assertEquals(map["apoc.kafka.retries"], properties["retries"])
assertEquals(map["apoc.kafka.batch.size"], properties["batch.size"])
Expand Down
7 changes: 5 additions & 2 deletions extra-dependencies/kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ def kafkaVersion = "2.4.0"
def jacksonVersion = "2.17.2"

dependencies {
implementation group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib-jdk8', version: '1.8.0'
implementation group: 'org.jetbrains.kotlin', name: 'kotlin-stdlib-jdk8', version: '2.1.0', commonExclusions
implementation group: 'io.ktor', name: 'ktor-jackson', version: '1.6.8', commonExclusions
implementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: jacksonVersion, commonExclusions
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: jacksonVersion, commonExclusions
implementation group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: '1.4.2', commonExclusions

implementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion, commonExclusions


}

0 comments on commit b47b1ae

Please sign in to comment.