Skip to content

Commit 66a45bd

Browse files
committed
Fixes neo4j-contrib#3799: Apache Kafka procedures
1 parent 7c6254a commit 66a45bd

File tree

111 files changed

+14574
-26
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

111 files changed

+14574
-26
lines changed

extended/build.gradle

+24-1
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ dependencies {
7272
exclude group: 'org.abego.treelayout'
7373
}
7474

75+
def kotlinVersion = "1.6.0"
76+
def kafkaVersion = "2.4.0"
77+
def jacksonVersion = "2.17.2"
78+
7579
def withoutServers = {
7680
exclude group: 'org.eclipse.jetty'
7781
exclude group: 'org.eclipse.jetty.aggregate'
@@ -115,7 +119,10 @@ dependencies {
115119
}
116120
compileOnly group: 'com.couchbase.client', name: 'java-client', version: '3.3.0', withoutJacksons
117121
compileOnly group: 'io.lettuce', name: 'lettuce-core', version: '6.1.1.RELEASE'
118-
compileOnly group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: '2.14.0', withoutJacksons
122+
compileOnly group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: jacksonVersion, withoutJacksons
123+
testImplementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: jacksonVersion, withoutJacksons
124+
compileOnly group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: jacksonVersion
125+
testImplementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: jacksonVersion
119126
compileOnly group: 'com.amazonaws', name: 'aws-java-sdk-s3', version: '1.11.270'
120127
compileOnly group: 'com.amazonaws', name: 'aws-java-sdk-comprehend', version: '1.12.353' , withoutJacksons
121128
compileOnly group: 'com.sun.mail', name: 'javax.mail', version: '1.6.0'
@@ -128,6 +135,12 @@ dependencies {
128135
compileOnly group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0'
129136
compileOnly group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0'
130137

138+
compileOnly group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: '1.4.2'
139+
compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion
140+
compileOnly group: 'com.github.conker84', name: 'neo4j-configuration-lifecycle', version: 'ad59084711'
141+
compileOnly group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.2.2'
142+
143+
testImplementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.16.1'
131144
testImplementation group: 'org.apache.arrow', name: 'arrow-vector', version: '13.0.0'
132145
testImplementation group: 'org.apache.arrow', name: 'arrow-memory-netty', version: '13.0.0'
133146

@@ -155,6 +168,16 @@ dependencies {
155168
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.4.0'
156169
testImplementation group: 'org.apache.parquet', name: 'parquet-hadoop', version: '1.13.1', withoutServers
157170
testImplementation group: 'com.opencsv', name: 'opencsv', version: '5.7.1'
171+
testImplementation group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: '1.4.2'
172+
// testImplementation group: 'org.jetbrains.kotlin', name: 'kotlin-test-junit', version: kotlinVersion
173+
// testImplementation group: 'org.jetbrains.kotlin', name: 'kotlin-test-junit5', version: kotlinVersion
174+
175+
testImplementation group: 'org.jetbrains.kotlin', name: 'kotlin-test', version: '1.6.0'
176+
177+
testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: kafkaVersion
178+
testImplementation group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.2.2'
179+
testImplementation group: 'org.testcontainers', name: 'kafka', version: testContainersVersion
180+
testImplementation group: 'com.github.conker84', name: 'neo4j-configuration-lifecycle', version: 'ad59084711'
158181

159182
configurations.all {
160183
exclude group: 'org.slf4j', module: 'slf4j-nop'

extended/src/main/java/apoc/ExtendedApocConfig.java

+20
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class ExtendedApocConfig extends LifecycleAdapter
4444
public static final String APOC_ML_WATSON_URL = "apoc.ml.watson.url";
4545
public static final String APOC_AWS_KEY_ID = "apoc.aws.key.id";
4646
public static final String APOC_AWS_SECRET_KEY = "apoc.aws.secret.key";
47+
public static final String APOC_KAFKA_ENABLED = "apoc.kafka.enabled";
4748
public enum UuidFormatType { hex, base64 }
4849

4950
// These were earlier added via the Neo4j config using the ApocSettings.java class
@@ -73,6 +74,25 @@ public enum UuidFormatType { hex, base64 }
7374

7475
public static final String CONFIG_DIR = "config-dir=";
7576

77+
private static final String CONF_DIR_ARG = "config-dir=";
78+
private static final String SOURCE_ENABLED = "apoc.kafka.source.enabled";
79+
private static final boolean SOURCE_ENABLED_VALUE = true;
80+
private static final String PROCEDURES_ENABLED = "apoc.kafka.procedures.enabled";
81+
private static final boolean PROCEDURES_ENABLED_VALUE = true;
82+
private static final String SINK_ENABLED = "apoc.kafka.sink.enabled";
83+
private static final boolean SINK_ENABLED_VALUE = false;
84+
private static final String CHECK_APOC_TIMEOUT = "apoc.kafka.check.apoc.timeout";
85+
private static final String CHECK_APOC_INTERVAL = "apoc.kafka.check.apoc.interval";
86+
private static final String CLUSTER_ONLY = "apoc.kafka.cluster.only";
87+
private static final String CHECK_WRITEABLE_INSTANCE_INTERVAL = "apoc.kafka.check.writeable.instance.interval";
88+
private static final String SYSTEM_DB_WAIT_TIMEOUT = "apoc.kafka.systemdb.wait.timeout";
89+
private static final long SYSTEM_DB_WAIT_TIMEOUT_VALUE = 10000L;
90+
private static final String POLL_INTERVAL = "apoc.kafka.sink.poll.interval";
91+
private static final String INSTANCE_WAIT_TIMEOUT = "apoc.kafka.wait.timeout";
92+
private static final long INSTANCE_WAIT_TIMEOUT_VALUE = 120000L;
93+
private static final int DEFAULT_TRIGGER_PERIOD = 10000;
94+
private static final String DEFAULT_PATH = ".";
95+
7696
public ExtendedApocConfig(LogService log, GlobalProcedures globalProceduresRegistry, String defaultConfigPath) {
7797
this.log = log.getInternalLog(ApocConfig.class);
7898
this.defaultConfigPath = defaultConfigPath;

extended/src/main/java/apoc/ExtendedApocGlobalComponents.java

+48-18
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,19 @@
1212
import org.neo4j.kernel.availability.AvailabilityListener;
1313
import org.neo4j.kernel.internal.GraphDatabaseAPI;
1414
import org.neo4j.kernel.lifecycle.Lifecycle;
15+
import org.neo4j.logging.Log;
1516

17+
import java.util.ArrayList;
18+
import java.util.Arrays;
1619
import java.util.Collection;
1720
import java.util.Collections;
21+
import java.util.HashMap;
1822
import java.util.List;
1923
import java.util.Map;
2024
import java.util.concurrent.ConcurrentHashMap;
2125

26+
import static apoc.ExtendedApocConfig.APOC_KAFKA_ENABLED;
27+
2228
@ServiceProvider
2329
public class ExtendedApocGlobalComponents implements ApocGlobalComponents {
2430

@@ -37,36 +43,60 @@ public Map<String, Lifecycle> getServices(GraphDatabaseAPI db, ApocExtensionFact
3743
);
3844
cypherProcedureHandlers.put(db, cypherProcedureHandler);
3945

40-
return Map.of(
46+
Map<String, Lifecycle> serviceMap = new HashMap<>();
47+
serviceMap.put("ttl", new TTLLifeCycle(dependencies.scheduler(),
48+
db,
49+
TTLConfig.ttlConfig(),
50+
dependencies.log().getUserLog(TTLLifeCycle.class)));
4151

42-
"ttl", new TTLLifeCycle(dependencies.scheduler(),
43-
db,
44-
TTLConfig.ttlConfig(),
45-
dependencies.log().getUserLog(TTLLifeCycle.class)),
52+
serviceMap.put("uuid", new UuidHandler(db,
53+
dependencies.databaseManagementService(),
54+
dependencies.log().getUserLog(Uuid.class),
55+
dependencies.apocConfig(),
56+
dependencies.scheduler(),
57+
dependencies.pools()));
4658

47-
"uuid", new UuidHandler(db,
48-
dependencies.databaseManagementService(),
49-
dependencies.log().getUserLog(Uuid.class),
50-
dependencies.apocConfig(),
51-
dependencies.scheduler(),
52-
dependencies.pools()),
59+
serviceMap.put("directory", new LoadDirectoryHandler(db,
60+
dependencies.log().getUserLog(LoadDirectory.class),
61+
dependencies.pools()));
5362

54-
"directory", new LoadDirectoryHandler(db,
55-
dependencies.log().getUserLog(LoadDirectory.class),
56-
dependencies.pools()),
63+
serviceMap.put("cypherProcedures", cypherProcedureHandler);
64+
65+
if (dependencies.apocConfig().getBoolean(APOC_KAFKA_ENABLED)) {
66+
try {
67+
Class<?> kafkaHandlerClass = Class.forName("apoc.kafka.KafkaHandler");
68+
Lifecycle kafkaHandler = (Lifecycle) kafkaHandlerClass
69+
.getConstructor(GraphDatabaseAPI.class, Log.class)
70+
.newInstance(db, dependencies.log().getUserLog(kafkaHandlerClass));
71+
72+
serviceMap.put("kafkaHandler", kafkaHandler);
73+
} catch (Exception e) {
74+
dependencies.log().getUserLog(ExtendedApocGlobalComponents.class)
75+
.warn("""
76+
Cannot find the Kafka extra jar.
77+
Please put the apoc-kafka-dependencies-5.x.x-all.jar into plugin folder.
78+
See the documentation: https://neo4j.com/labs/apoc/5/overview/apoc.kakfa""");
79+
}
80+
}
81+
82+
return serviceMap;
5783

58-
"cypherProcedures", cypherProcedureHandler
59-
);
6084
}
6185

6286
@Override
6387
public Collection<Class> getContextClasses() {
64-
return List.of(CypherProceduresHandler.class, UuidHandler.class, LoadDirectoryHandler.class);
88+
List<Class> contextClasses = new ArrayList<>(
89+
Arrays.asList(CypherProceduresHandler.class, UuidHandler.class, LoadDirectoryHandler.class)
90+
);
91+
try {
92+
contextClasses.add(Class.forName("apoc.kafka.KafkaHandler"));
93+
} catch (ClassNotFoundException ignored) {}
94+
return contextClasses;
6595
}
6696

6797
@Override
6898
public Iterable<AvailabilityListener> getListeners(GraphDatabaseAPI db, ApocExtensionFactory.Dependencies dependencies) {
6999
CypherProceduresHandler cypherProceduresHandler = cypherProcedureHandlers.get(db);
70100
return cypherProceduresHandler==null ? Collections.emptyList() : Collections.singleton(cypherProceduresHandler);
71101
}
72-
}
102+
}

extended/src/main/java/apoc/generate/Generate.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public void complete(@Name("noNodes") Long noNodes, @Name("label") String label,
7070
@Procedure(name = "apoc.generate.simple",mode = Mode.WRITE)
7171
@Description("apoc.generate.simple(degrees, label, type) - generates a simple random graph according to the given degree distribution")
7272
public void simple(@Name("degrees") List<Long> degrees, @Name("label") String label, @Name("type") String relationshipType) throws IOException {
73-
if (degrees == null) degrees = Arrays.asList(2L, 2L, 2L, 2L);
73+
if (degrees == null) degrees = java.util.Arrays.asList(2L, 2L, 2L, 2L);
7474

7575
List<Integer> intDegrees = degrees.stream().map(Long::intValue).collect(Collectors.toList());
7676

extended/src/main/java/apoc/load/Jdbc.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ private Stream<RowResult> executeQuery(String urlOrKey, String tableOrSelect, Ma
103103
}
104104
}
105105

106-
@Procedure(mode = Mode.DBMS)
106+
@Procedure(mode = Mode.WRITE)
107107
@Description("apoc.load.jdbcUpdate('key or url','statement',[params],config) YIELD row - update relational database, from a SQL statement with optional parameters")
108108
public Stream<RowResult> jdbcUpdate(@Name("jdbc") String urlOrKey, @Name("query") String query, @Name(value = "params", defaultValue = "[]") List<Object> params, @Name(value = "config",defaultValue = "{}") Map<String, Object> config) {
109109
log.info( String.format( "Executing SQL update: %s", query ) );
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package apoc.kafka
2+
3+
import apoc.ApocConfig
4+
import apoc.ExtendedApocConfig.APOC_KAFKA_ENABLED
5+
import apoc.kafka.config.StreamsConfig
6+
import apoc.kafka.consumer.StreamsSinkConfigurationListener
7+
import apoc.kafka.producer.StreamsRouterConfigurationListener
8+
import org.neo4j.kernel.internal.GraphDatabaseAPI
9+
import org.neo4j.kernel.lifecycle.LifecycleAdapter
10+
import org.neo4j.logging.Log
11+
12+
class KafkaHandler(): LifecycleAdapter() {
13+
14+
private lateinit var db: GraphDatabaseAPI
15+
private lateinit var log: Log
16+
17+
constructor(db: GraphDatabaseAPI, log: Log) : this() {
18+
this.db = db
19+
this.log = log
20+
}
21+
22+
override fun start() {
23+
if(ApocConfig.apocConfig().getBoolean(APOC_KAFKA_ENABLED)) {
24+
// println("start db......")
25+
26+
try {
27+
StreamsRouterConfigurationListener(db, log)
28+
.start(StreamsConfig.getConfiguration())
29+
} catch (e: Exception) {
30+
log.error("Exception in StreamsRouterConfigurationListener {}", e.message)
31+
}
32+
33+
try {
34+
StreamsSinkConfigurationListener(db, log)
35+
.start(StreamsConfig.getConfiguration())
36+
} catch (e: Exception) {
37+
log.error("Exception in StreamsSinkConfigurationListener {}", e.message)
38+
}
39+
}
40+
}
41+
42+
override fun stop() {
43+
if(ApocConfig.apocConfig().getBoolean(APOC_KAFKA_ENABLED)) {
44+
// println("stop db..........")
45+
46+
StreamsRouterConfigurationListener(db, log).shutdown()
47+
StreamsSinkConfigurationListener(db, log).shutdown()
48+
}
49+
}
50+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
//package apoc.kafka
2+
//
3+
//import apoc.kafka.consumer.StreamsSinkConfiguration
4+
//import apoc.kafka.consumer.StreamsTopicService
5+
//import apoc.kafka.extensions.isDefaultDb
6+
//import apoc.kafka.service.StreamsStrategyStorage
7+
//import apoc.kafka.service.TopicType
8+
//import apoc.kafka.service.sink.strategy.*
9+
//import org.neo4j.graphdb.GraphDatabaseService
10+
//
11+
//class Neo4jStreamsStrategyStorage(private val streamsTopicService: StreamsTopicService,
12+
// private val streamsConfig: Map<String, String>,
13+
// private val db: GraphDatabaseService): StreamsStrategyStorage() {
14+
//
15+
// override fun getTopicType(topic: String): TopicType? {
16+
// return streamsTopicService.getTopicType(topic)
17+
// }
18+
//
19+
// private fun <T> getTopicsByTopicType(topicType: TopicType): T = streamsTopicService.getByTopicType(topicType) as T
20+
//
21+
// override fun getStrategy(topic: String): IngestionStrategy = when (val topicType = getTopicType(topic)) {
22+
// TopicType.CDC_SOURCE_ID -> {
23+
// val strategyConfig = StreamsSinkConfiguration
24+
// .createSourceIdIngestionStrategyConfig(streamsConfig, db.databaseName(), db.isDefaultDb())
25+
// SourceIdIngestionStrategy(strategyConfig)
26+
// }
27+
// TopicType.CDC_SCHEMA -> SchemaIngestionStrategy()
28+
// TopicType.CUD -> CUDIngestionStrategy()
29+
// TopicType.PATTERN_NODE -> {
30+
// val map = getTopicsByTopicType<Map<String, NodePatternConfiguration>>(topicType)
31+
// NodePatternIngestionStrategy(map.getValue(topic))
32+
// }
33+
// TopicType.PATTERN_RELATIONSHIP -> {
34+
// val map = getTopicsByTopicType<Map<String, RelationshipPatternConfiguration>>(topicType)
35+
// RelationshipPatternIngestionStrategy(map.getValue(topic))
36+
// }
37+
// TopicType.CYPHER -> {
38+
// CypherTemplateStrategy(streamsTopicService.getCypherTemplate(topic)!!)
39+
// }
40+
// else -> throw RuntimeException("Topic Type not Found")
41+
// }
42+
//
43+
//}

0 commit comments

Comments
 (0)