From 2c38a5a06a7d6bd93839187134d8d7ffb96dcf07 Mon Sep 17 00:00:00 2001 From: Paulo Alberto Simoes Date: Thu, 1 Aug 2024 22:20:31 -0300 Subject: [PATCH 1/7] wip examples 23.4 --- examples/consumer/pom.xml | 82 --------------------------------------- examples/producer/pom.xml | 82 --------------------------------------- 2 files changed, 164 deletions(-) delete mode 100644 examples/consumer/pom.xml delete mode 100644 examples/producer/pom.xml diff --git a/examples/consumer/pom.xml b/examples/consumer/pom.xml deleted file mode 100644 index 684d740..0000000 --- a/examples/consumer/pom.xml +++ /dev/null @@ -1,82 +0,0 @@ - - - - 4.0.0 - - org.oracle.okafka.examples - consumer - 23.4.0.0 - - consumer - https://github.com/oracle/okafka - - - UTF-8 - UTF-8 - ${java.version} - ${java.version} - 11 - 23.4.0.0 - - - - - com.oracle.database.messaging - okafka - 23.4.0.0 - - - junit - junit - 4.13.2 - test - - - - - - - - - maven-clean-plugin - 3.1.0 - - - - maven-resources-plugin - 3.0.2 - - - maven-compiler-plugin - 3.8.0 - - - maven-surefire-plugin - 2.22.1 - - - maven-jar-plugin - 3.0.2 - - - maven-install-plugin - 2.5.2 - - - maven-deploy-plugin - 2.8.2 - - - - maven-site-plugin - 3.7.1 - - - maven-project-info-reports-plugin - 3.0.0 - - - - - \ No newline at end of file diff --git a/examples/producer/pom.xml b/examples/producer/pom.xml deleted file mode 100644 index 7f54b0c..0000000 --- a/examples/producer/pom.xml +++ /dev/null @@ -1,82 +0,0 @@ - - - - 4.0.0 - - org.oracle.okafka.examples - producer - 0.8 - - producer - https://github.com/oracle/okafka - - - UTF-8 - UTF-8 - ${java.version} - ${java.version} - 11 - 0.8 - - - - - org.oracle.okafka - okafka - 0.8 - - - junit - junit - 4.13.2 - test - - - - - - - - - - maven-clean-plugin - 3.1.0 - - - - maven-resources-plugin - 3.0.2 - - - maven-compiler-plugin - 3.8.0 - - - maven-surefire-plugin - 2.22.1 - - - maven-jar-plugin - 3.0.2 - - - maven-install-plugin - 2.5.2 - - - maven-deploy-plugin - 2.8.2 - - - - maven-site-plugin - 3.7.1 - - - maven-project-info-reports-plugin - 3.0.0 - - - - - \ No newline at end of file From 4f73cae3310fd6299a3410317caed8fd3bab11e4 Mon Sep 17 00:00:00 2001 From: Paulo Alberto Simoes Date: Thu, 1 Aug 2024 22:31:28 -0300 Subject: [PATCH 2/7] wip examples 23.4 prd/cons --- build.gradle | 97 +++++++-------- .../okafka/examples/ConsumerOKafka.java | 31 +++-- .../src/main/resources/config.properties | 11 +- .../src/main/resources/ojdbc.properties | 2 + .../okafka/examples/ProducerOKafka.java | 111 +++++++++++------- .../src/main/resources/config.properties | 11 +- .../src/main/resources/ojdbc.properties | 2 + 7 files changed, 159 insertions(+), 106 deletions(-) create mode 100644 examples/consumer/src/main/resources/ojdbc.properties create mode 100644 examples/producer/src/main/resources/ojdbc.properties diff --git a/build.gradle b/build.gradle index 1591b4f..18cea7c 100644 --- a/build.gradle +++ b/build.gradle @@ -28,12 +28,12 @@ allprojects { options.windowTitle = "Oracle Database Transactional Event Queues Java API Reference" options.header = """Oracle® Database Transactional Event Queues Java API Reference
23ai

FF46992-04
""" options.bottom = """
Copyright © 2001, 2024, Oracle and/or its affiliates. All rights reserved.


""" - } + } } ext { - gradleVersion = '7.3' - minJavaVersion = JavaVersion.VERSION_1_8 + gradleVersion = '8.8' + minJavaVersion = JavaVersion.VERSION_17 mavenUrl = project.hasProperty('mavenUrl') ? project.mavenUrl : '' mavenUsername = project.hasProperty('mavenUsername') ? project.mavenUsername : '' @@ -42,41 +42,41 @@ ext { project(':clients') { apply plugin : 'java-library' - + sourceCompatibility = minJavaVersion targetCompatibility = minJavaVersion - + sourceSets { - main { - java { - srcDir 'src/main/java' - exclude 'tests/**' - exclude 'test/**' - } - } - } + main { + java { + srcDir 'src/main/java' + exclude 'tests/**' + exclude 'test/**' + } + } + } - println 'Creating okafka-23.4.0.0.jar' + println 'Building okafka 23.4.0.0 Java API jar' dependencies { - // These dependencies are used by the application. - implementation group: 'com.oracle.database.jdbc', name: 'ojdbc11', version: '23.4.0.24.05' - implementation group: 'com.oracle.database.messaging', name: 'aqapi', version: '23.3.0.0' - implementation group: 'javax.transaction', name: 'jta', version: '1.1' - implementation group: 'javax.jms', name: 'javax.jms-api', version: '2.0' - implementation group: 'com.oracle.database.security', name: 'oraclepki', version: '23.4.0.24.05' - implementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.0-alpha0' - implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.1' - // Use JUnit test framework - implementation group: 'junit', name: 'junit', version: '4.12' - - // Test dependencies - testImplementation group: 'org.easymock', name: 'easymock', version: '3.6' - testImplementation group: 'org.powermock', name: 'powermock-module-junit4', version: '2.0.0-beta.5' - testImplementation group: 'org.powermock', name: 'powermock-api-support', version: '2.0.5' - testImplementation group: 'org.powermock', name: 'powermock-api-easymock', version: '2.0.0-beta.5' - + // These dependencies are used by the application. + implementation group: 'com.oracle.database.jdbc', name: 'ojdbc11', version: '23.4.0.24.05' + implementation group: 'com.oracle.database.messaging', name: 'aqapi', version: '23.3.0.0' + implementation group: 'javax.transaction', name: 'jta', version: '1.1' + implementation group: 'javax.jms', name: 'javax.jms-api', version: '2.0' + implementation group: 'com.oracle.database.security', name: 'oraclepki', version: '23.4.0.24.05' + implementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.0-alpha0' + implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.1' + // Use JUnit test framework + implementation group: 'junit', name: 'junit', version: '4.12' + + // Test dependencies + testImplementation group: 'org.easymock', name: 'easymock', version: '3.6' + testImplementation group: 'org.powermock', name: 'powermock-module-junit4', version: '2.0.0-beta.5' + testImplementation group: 'org.powermock', name: 'powermock-api-support', version: '2.0.5' + testImplementation group: 'org.powermock', name: 'powermock-api-easymock', version: '2.0.0-beta.5' + } javadoc { @@ -87,18 +87,21 @@ project(':clients') { } tasks.named('jar') { - description('Creating JAR okafka-23.4.0.0.jar ') + description('Generates okafka 23.4.0.0 API jar ') archiveBaseName = 'okafka' archiveVersion = '23.4.0.0' + from "${rootProject.projectDir}/LICENSE.txt" from "${rootProject.projectDir}/NOTICE" - manifest { - attributes ( - 'Version': '23.4.0.0', - 'Build-Time-ISO-8601':new Date().format("yyyy-MM-dd HH:mm:ss") - ) - } + manifest { + attributes ( + 'Implementation-Title' : 'okafka', + 'Implementation-Version': project.version, + 'Version': '23.4.0.0', + 'Build-Time-ISO-8601':new Date().format("yyyy-MM-dd HH:mm:ss") + ) + } } tasks.register('fullJar', Jar) { @@ -107,7 +110,7 @@ project(':clients') { manifest { attributes( 'Implementation-Title' : 'okafka', - 'Implementation-Version': project.version) + 'Implementation-Version': project.version) } from "${rootProject.projectDir}/LICENSE.txt" @@ -137,6 +140,7 @@ project(':examples:consumer') { implementation project(':clients') implementation group: 'com.oracle.database.security', name: 'oraclepki', version: '23.4.0.24.05' implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.1' + implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.5.6' } @@ -150,14 +154,14 @@ project(':examples:consumer') { manifest { attributes( 'Implementation-Title' : 'okafka consumer', - 'Implementation-Version': project.version) + 'Implementation-Version': project.version) } } tasks.named('run') { - description('Run okafka client consumer') + description('Run okafka client simple consumer') application { - mainClass = 'org.oracle.okafka.examples.Consumer' + mainClass = 'org.oracle.okafka.examples.ConsumerOKafka' } } } @@ -174,10 +178,11 @@ project(':examples:producer') { implementation project(':clients') implementation group: 'com.oracle.database.security', name: 'oraclepki', version: '23.4.0.24.05' implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.1' + implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.5.6' } tasks.named('jar') { - description('Generates okafka client producer jar ') + description('Generates okafka client simple producer jar ') archiveBaseName = 'okafka' archiveClassifier = 'producer' @@ -186,14 +191,14 @@ project(':examples:producer') { manifest { attributes( 'Implementation-Title' : 'okafka producer', - 'Implementation-Version': project.version) + 'Implementation-Version': project.version) } } tasks.named('run') { - description('Run okafka client producer') + description('Run okafka client simple producer') application { - mainClass = 'org.oracle.okafka.examples.Producer' + mainClass = 'org.oracle.okafka.examples.ProducerOKafka' } } } diff --git a/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java b/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java index 558bd75..bdd75b0 100644 --- a/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java +++ b/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java @@ -7,22 +7,39 @@ package org.oracle.okafka.examples; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; import java.util.Properties; import java.time.Duration; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; public class ConsumerOKafka { public static void main(String[] args) { - Properties props = new getProperties(); - String topic = props.getProperty("topic.name", "TXEQ"); - props.remove("topic.name"); // Pass props to build OKafkaConsumer - KafkaConsumer consumer = new KafkaConsumer(props); + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); + + // Get application properties + Properties appProperties = null; + try { + appProperties = getProperties(); + if (appProperties == null) { + System.out.println("Application properties not found!"); + System.exit(-1); + } + } catch (Exception e) { + System.out.println("Application properties not found!"); + System.out.println("Exception: " + e); + System.exit(-1); + } + + String topic = appProperties.getProperty("topic.name", "TXEQ"); + appProperties.remove("topic.name"); // Pass props to build OKafkaProducer + + KafkaConsumer consumer = new KafkaConsumer(appProperties); consumer.subscribe(Arrays.asList(topic)); while(true) { @@ -58,7 +75,7 @@ private static java.util.Properties getProperties() throws IOException { try { Properties prop = new Properties(); String propFileName = "config.properties"; - inputStream = new FileInPutStream(propFileName); + inputStream = ConsumerOKafka.class.getClassLoader().getResourceAsStream(propFileName); if (inputStream != null) { prop.load(inputStream); } else { diff --git a/examples/consumer/src/main/resources/config.properties b/examples/consumer/src/main/resources/config.properties index 5069223..e45f2dd 100644 --- a/examples/consumer/src/main/resources/config.properties +++ b/examples/consumer/src/main/resources/config.properties @@ -5,18 +5,19 @@ bootstrap.servers= oracle.service.name= oracle.net.tns_admin= + #Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet #security.protocol=SSL #oracle.net.tns_admin= #tns.alias= # Application specific OKafka consumer properties +topic.name= group.id= -enable.auto.commit=false -max.poll.records=100 +enable.auto.commit=true +max.poll.records=1000 -key.deserializer=org.oracle.okafka.common.serialization.StringDeserializer -value.deserializer=org.oracle.okafka.common.serialization.StringDeserializer +key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer +value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -topic.name= diff --git a/examples/consumer/src/main/resources/ojdbc.properties b/examples/consumer/src/main/resources/ojdbc.properties new file mode 100644 index 0000000..14f7246 --- /dev/null +++ b/examples/consumer/src/main/resources/ojdbc.properties @@ -0,0 +1,2 @@ +user= +password= \ No newline at end of file diff --git a/examples/producer/src/main/java/org/oracle/okafka/examples/ProducerOKafka.java b/examples/producer/src/main/java/org/oracle/okafka/examples/ProducerOKafka.java index 32a860c..1cc464f 100644 --- a/examples/producer/src/main/java/org/oracle/okafka/examples/ProducerOKafka.java +++ b/examples/producer/src/main/java/org/oracle/okafka/examples/ProducerOKafka.java @@ -8,10 +8,14 @@ package org.oracle.okafka.examples; import org.oracle.okafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Properties; import java.util.concurrent.Future; @@ -19,65 +23,82 @@ public class ProducerOKafka { public static void main(String[] args) { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); + + // Get application properties + Properties appProperties = null; + try { + appProperties = getProperties(); + if (appProperties == null) { + System.out.println("Application properties not found!"); + System.exit(-1); + } + } catch (Exception e) { + System.out.println("Application properties not found!"); + System.out.println("Exception: " + e); + System.exit(-1); + } + + String topic = appProperties.getProperty("topic.name", "TXEQ"); + appProperties.remove("topic.name"); // Pass props to build OKafkaProducer + + Producer producer = new KafkaProducer<>(appProperties); + + String baseMsg = "This is test with 128 characters Payload used to test Oracle Kafka. "+ + "Read https://github.com/oracle/okafka/blob/master/README.md"; + + Future lastFuture = null; + int msgCnt = 10; + String key = "Just some key for OKafka"; + ArrayList> metadataList = new ArrayList<>(); + try { - Properties props = new getProperties(); - String topic = props.getProperty("topic.name", "TXEQ"); - props.remove("topic.name"); // Pass props to build OKafkaProducer - Producer producer = new KafkaProducer(props); - - String baseMsg = "This is test with 128 characters Payload used to test Oracle Kafka. "+ - "Read https://github.com/oracle/okafka/blob/master/README.md"; - - - Future lastFuture = null; - int msgCnt = 10; - String key = "Just some key for OKafka"; - ArrayList> metadataList = new ArrayList>(); - for(int i=0;i producerRecord = - new ProducerRecord(topic, key+i, i+ baseMsg); + RecordHeader rH2 = new RecordHeader("REPLY_TO", "TXEQ_2".getBytes()); + ProducerRecord producerRecord = + new ProducerRecord<>(topic, key+i, i+ baseMsg); producerRecord.headers().add(rH1).add(rH2); lastFuture = producer.send(producerRecord); metadataList.add(lastFuture); } RecordMetadata rd = lastFuture.get(); System.out.println("Last record placed in " + rd.partition() + " Offset " + rd.offset()); - + } + catch(Exception e) { + System.out.println("Failed to send messages:"); + e.printStackTrace(); + } + finally { System.out.println("Initiating close"); producer.close(); - } - catch(Exception e) - { - System.out.println("Exception in Main " + e ); - e.printStackTrace(); } + } - - private static java.util.Properties getProperties() throws IOException { - InputStream inputStream = null; - Properties appProperties = null; - try { - Properties prop = new Properties(); - String propFileName = "config.properties"; - inputStream = new FileInPutStream(propFileName); - if (inputStream != null) { - prop.load(inputStream); - } else { - throw new FileNotFoundException("property file '" + propFileName + "' not found."); - } - appProperties = prop; + private static java.util.Properties getProperties() throws IOException { + InputStream inputStream = null; + Properties appProperties; - } catch (Exception e) { - System.out.println("Exception: " + e); - throw e; - } finally { - inputStream.close(); - } - return appProperties; - } + try { + Properties prop = new Properties(); + String propFileName = "config.properties"; + inputStream = ProducerOKafka.class.getClassLoader().getResourceAsStream(propFileName); + if (inputStream != null) { + prop.load(inputStream); + } else { + throw new FileNotFoundException("property file '" + propFileName + "' not found."); + } + appProperties = prop; + + } catch (Exception e) { + System.out.println("Exception: " + e); + throw e; + } finally { + if (inputStream != null) + inputStream.close(); + } + return appProperties; + } } diff --git a/examples/producer/src/main/resources/config.properties b/examples/producer/src/main/resources/config.properties index 4f015c2..0fbbbb2 100644 --- a/examples/producer/src/main/resources/config.properties +++ b/examples/producer/src/main/resources/config.properties @@ -5,15 +5,20 @@ bootstrap.servers= oracle.service.name= oracle.net.tns_admin= + + #Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet #security.protocol=SSL #oracle.net.tns_admin= #tns.alias= #Appliction specific OKafka Producer properties - batch.size=200 linger.ms=100 -key.serializer=org.oracle.okafka.common.serialization.StringSerializer -value.serializer=org.oracle.okafka.common.serialization.StringSerializer +buffer.memory=335544 + +enable.idempotence=true +key.serializer=org.apache.kafka.common.serialization.StringSerializer +value.serializer=org.apache.kafka.common.serialization.StringSerializer + topic.name= \ No newline at end of file diff --git a/examples/producer/src/main/resources/ojdbc.properties b/examples/producer/src/main/resources/ojdbc.properties new file mode 100644 index 0000000..14f7246 --- /dev/null +++ b/examples/producer/src/main/resources/ojdbc.properties @@ -0,0 +1,2 @@ +user= +password= \ No newline at end of file From 07f455b066eb0919f795bf78b3e0751b715abbe8 Mon Sep 17 00:00:00 2001 From: Paulo Alberto Simoes Date: Mon, 5 Aug 2024 22:41:16 -0300 Subject: [PATCH 3/7] wip examples 23.4 tx prod/tx cons --- build.gradle | 79 ++++++++ .../examples/TransactionalConsumerOKafka.java | 172 ++++++++++++++++++ .../src/main/resources/config.properties | 36 ++++ .../src/main/resources/ojdbc.properties | 2 + .../examples/TransactionalProducerOKafka.java | 139 ++++++++++++++ .../src/main/resources/config.properties | 33 ++++ .../src/main/resources/ojdbc.properties | 2 + .../okafka/examples/ConsumerOKafka.java | 9 +- .../src/main/resources/config.properties | 20 +- .../src/main/resources/ojdbc.properties | 4 +- .../src/main/resources/config.properties | 13 +- .../src/main/resources/ojdbc.properties | 4 +- settings.gradle | 5 +- 13 files changed, 499 insertions(+), 19 deletions(-) create mode 100644 examples/TxConsumer/src/main/java/org/oracle/okafka/examples/TransactionalConsumerOKafka.java create mode 100644 examples/TxConsumer/src/main/resources/config.properties create mode 100644 examples/TxConsumer/src/main/resources/ojdbc.properties create mode 100644 examples/TxProducer/src/main/java/org/oracle/okafka/examples/TransactionalProducerOKafka.java create mode 100644 examples/TxProducer/src/main/resources/config.properties create mode 100644 examples/TxProducer/src/main/resources/ojdbc.properties diff --git a/build.gradle b/build.gradle index 18cea7c..9f910e8 100644 --- a/build.gradle +++ b/build.gradle @@ -203,6 +203,85 @@ project(':examples:producer') { } } +project(':examples:TxProducer') { + apply plugin : 'java' + apply plugin : 'application' + + sourceCompatibility = minJavaVersion + targetCompatibility = minJavaVersion + + dependencies { + // These dependencies are used by the application. + implementation project(':clients') + implementation group: 'com.oracle.database.security', name: 'oraclepki', version: '23.4.0.24.05' + implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.1' + implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.5.6' + } + + tasks.named('jar') { + description('Generates OKafka Transactional Producer jar ') + archiveBaseName = 'okafka' + archiveClassifier = 'txproducer' + + from "${rootProject.projectDir}/LICENSE.txt" + from "${rootProject.projectDir}/NOTICE" + + manifest { + attributes( + 'Implementation-Title' : 'OKafka Transactional Producer', + 'Implementation-Version': project.version + ) + } + } + + tasks.named('run') { + description('Run OKafka Transactional Producer') + application { + mainClass = 'org.oracle.okafka.examples.TransactionalProducerOKafka' + } + } +} + +project(':examples:TxConsumer') { + apply plugin : 'java' + apply plugin : 'application' + + sourceCompatibility = minJavaVersion + targetCompatibility = minJavaVersion + + dependencies { + // These dependencies are used by the application. + implementation project(':clients') + implementation group: 'com.oracle.database.security', name: 'oraclepki', version: '23.4.0.24.05' + implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.1' + implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.5.6' + } + + + tasks.named('jar') { + description('Generates OKafka Transactional Consumer jar ') + archiveBaseName = 'okafka' + archiveClassifier = 'txconsumer' + + from "${rootProject.projectDir}/LICENSE.txt" + from "${rootProject.projectDir}/NOTICE" + + manifest { + attributes( + 'Implementation-Title' : 'OKafka Transactional Consumer', + 'Implementation-Version': project.version + ) + } + } + + tasks.named('run') { + description('Run OKafka Transactional Consumer') + application { + mainClass = 'org.oracle.okafka.examples.TransactionalConsumerOKafka' + } + } +} + configurations { childJar } diff --git a/examples/TxConsumer/src/main/java/org/oracle/okafka/examples/TransactionalConsumerOKafka.java b/examples/TxConsumer/src/main/java/org/oracle/okafka/examples/TransactionalConsumerOKafka.java new file mode 100644 index 0000000..efd0d9e --- /dev/null +++ b/examples/TxConsumer/src/main/java/org/oracle/okafka/examples/TransactionalConsumerOKafka.java @@ -0,0 +1,172 @@ +/* + ** OKafka Java Client version 23.4. + ** + ** Copyright (c) 2019, 2024 Oracle and/or its affiliates. + ** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. + */ + +package org.oracle.okafka.examples; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; +import java.sql.Connection; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import org.oracle.okafka.clients.consumer.KafkaConsumer; + + +public class TransactionalConsumerOKafka { + + // Dummy implementation of ConsumerRebalanceListener interface + // It only maintains the list of assigned partitions in assignedPartitions list + static class ConsumerRebalance implements ConsumerRebalanceListener { + + public List assignedPartitions = new ArrayList(); + + @Override + public synchronized void onPartitionsAssigned(Collection partitions) { + System.out.println("Newly Assigned Partitions:"); + for (TopicPartition tp :partitions ) { + System.out.println(tp); + assignedPartitions.add(tp); + } + } + + @Override + public synchronized void onPartitionsRevoked(Collection partitions) { + System.out.println("Revoked previously assigned partitions. "); + for (TopicPartition tp :assignedPartitions ) { + System.out.println(tp); + } + assignedPartitions.clear(); + } + } + + public static void main(String[] args) { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); + + // Get application properties + Properties appProperties = null; + try { + appProperties = getProperties(); + if (appProperties == null) { + System.out.println("Application properties not found!"); + System.exit(-1); + } + } catch (Exception e) { + System.out.println("Application properties not found!"); + System.out.println("Exception: " + e); + System.exit(-1); + } + + String topic = appProperties.getProperty("topic.name", "TXEQ"); + appProperties.remove("topic.name"); // Pass props to build OKafkaProducer + + Consumer consumer = new KafkaConsumer<>(appProperties); + ConsumerRebalanceListener rebalanceListener = new ConsumerRebalance(); + + consumer.subscribe(Arrays.asList(topic), rebalanceListener); + + int expectedMsgCnt = 100; + int msgCnt = 0; + Connection conn = null; + boolean fail = false; + try { + while(true) { + try { + //Consumes records from the assigned partitions of 'TXEQ' topic + ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); + + if (records.count() > 0 ) + { + conn = ((KafkaConsumer)consumer).getDBConnection(); + fail = false; + for (ConsumerRecord record : records) + { + System.out.printf("partition = %d, offset = %d, key = %s, value =%s\n ", record.partition(), record.offset(), record.key(), record.value()); + for(Header h: record.headers()) + { + System.out.println("Header: " +h.toString()); + } + try { + processRecord(conn, record); + } catch(Exception e) { + fail = true; + break; + } + } + if(fail){ + conn.rollback(); + } + else { + msgCnt += records.count(); + consumer.commitSync(); + } + + if(msgCnt >= (expectedMsgCnt )) { + System.out.println("Received " + msgCnt + " Expected " + expectedMsgCnt +". Exiting Now."); + break; + } + } + else { + System.out.println("No Record Fetched. Retrying in 1 second"); + Thread.sleep(1000); + } + }catch(Exception e) + { + System.out.println("Exception while consuming messages: " + e.getMessage()); + throw e; + } + } + }catch(Exception e) + { + System.out.println("Exception from OKafka consumer " + e); + e.printStackTrace(); + }finally { + System.out.println("Closing OKafka Consumer. Received "+ msgCnt +" records."); + consumer.close(); + } + } + + private static void processRecord(Connection conn, ConsumerRecord record) + { + //Application specific logic to process the message + } + + private static java.util.Properties getProperties() throws IOException { + InputStream inputStream = null; + Properties appProperties = null; + + try { + Properties prop = new Properties(); + String propFileName = "config.properties"; + inputStream = TransactionalConsumerOKafka.class.getClassLoader().getResourceAsStream(propFileName); + if (inputStream != null) { + prop.load(inputStream); + } else { + throw new FileNotFoundException("property file '" + propFileName + "' not found."); + } + appProperties = prop; + + } catch (Exception e) { + System.out.println("Exception: " + e); + throw e; + } finally { + inputStream.close(); + } + return appProperties; + } +} \ No newline at end of file diff --git a/examples/TxConsumer/src/main/resources/config.properties b/examples/TxConsumer/src/main/resources/config.properties new file mode 100644 index 0000000..e7a9253 --- /dev/null +++ b/examples/TxConsumer/src/main/resources/config.properties @@ -0,0 +1,36 @@ +# okafka consumer example properties + +#Properties to connect to Oracle Database +#Option 1: Connect to Oracle database using plaintext +#bootstrap.servers= +#oracle.service.name= +#oracle.net.tns_admin= +security.protocol=PLAINTEXT +bootstrap.servers=84.235.172.160:1521 +oracle.service.name="freepdb1" +oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/examples/TxConsumer/src/main/resources + + +#Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet +#security.protocol=SSL +#oracle.net.tns_admin= +#tns.alias= + +# Application specific OKafka consumer properties +#topic.name= +topic.name=TXEQ +#group.id= +group.id=CG1 + +enable.auto.commit=false +# Maximum number of records fetched in single poll call +max.poll.records=10 + +key.deserializer=org.apache.kafka.common.serialization.StringDeserializer +value.deserializer=org.apache.kafka.common.serialization.StringDeserializer + + + + + + diff --git a/examples/TxConsumer/src/main/resources/ojdbc.properties b/examples/TxConsumer/src/main/resources/ojdbc.properties new file mode 100644 index 0000000..6a7530d --- /dev/null +++ b/examples/TxConsumer/src/main/resources/ojdbc.properties @@ -0,0 +1,2 @@ +user=okafka_user +password=Wr_nBa_5dfgqxSpa \ No newline at end of file diff --git a/examples/TxProducer/src/main/java/org/oracle/okafka/examples/TransactionalProducerOKafka.java b/examples/TxProducer/src/main/java/org/oracle/okafka/examples/TransactionalProducerOKafka.java new file mode 100644 index 0000000..980490c --- /dev/null +++ b/examples/TxProducer/src/main/java/org/oracle/okafka/examples/TransactionalProducerOKafka.java @@ -0,0 +1,139 @@ +/* + ** OKafka Java Client version 23.4. + ** + ** Copyright (c) 2019, 2024 Oracle and/or its affiliates. + ** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. + */ + +package org.oracle.okafka.examples; + +import org.oracle.okafka.clients.producer.KafkaProducer; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.header.internals.RecordHeader; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.sql.Connection; +import java.util.Properties; + +public class TransactionalProducerOKafka { + public static void main(String[] args) { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); + + // Get application properties + Properties appProperties = null; + try { + appProperties = getProperties(); + if (appProperties == null) { + System.out.println("Application properties not found!"); + System.exit(-1); + } + } catch (Exception e) { + System.out.println("Application properties not found!"); + System.out.println("Exception: " + e); + System.exit(-1); + } + + Producer producer = null; + + try { + String topic = appProperties.getProperty("topic.name", "TXEQ"); + appProperties.remove("topic.name"); // Pass props to build OKafkaProducer + + producer = new KafkaProducer<>(appProperties); + + int msgCnt = 100; + String jsonPayload = "{\"name\":\"Programmer"+msgCnt+"\",\"status\":\"classy\",\"catagory\":\"general\",\"region\":\"north\",\"title\":\"programmer\"}"; + System.out.println(jsonPayload); + producer.initTransactions(); + + Connection conn = ((KafkaProducer )producer).getDBConnection(); + + // Produce 100 records in a transaction and commit. + try { + producer.beginTransaction(); + boolean fail = false; + for( int i=0;i producerRecord = + new ProducerRecord(topic, i+"", jsonPayload); + producerRecord.headers().add(rH1).add(rH2); + try { + processRecord(conn, producerRecord); + } catch(Exception e) { + //Retry processRecord or abort the Okafka transaction and close the producer + fail = true; + break; + } + producer.send(producerRecord); + } + + if(fail) // Failed to process the records. Abort Okafka transaction + producer.abortTransaction(); + else // Successfully process all the records. Commit OKafka transaction + producer.commitTransaction(); + + System.out.println("Produced 100 messages."); + }catch( DisconnectException dcE) { + producer.close(); + }catch (KafkaException e) { + producer.abortTransaction(); + } + } + catch(Exception e) + { + System.out.println("Exception in Main " + e ); + e.printStackTrace(); + } + finally { + try { + if(producer != null) + producer.close(); + }catch(Exception e) + { + System.out.println("Exception while closing producer " + e); + e.printStackTrace(); + + } + System.out.println("Producer Closed"); + } + } + + private static void processRecord(Connection conn, ProducerRecord record) throws Exception + { + //Application specific logic + } + + private static java.util.Properties getProperties() throws IOException { + InputStream inputStream = null; + Properties appProperties; + + try { + Properties prop = new Properties(); + String propFileName = "config.properties"; + inputStream = TransactionalProducerOKafka.class.getClassLoader().getResourceAsStream(propFileName); + if (inputStream != null) { + prop.load(inputStream); + } else { + throw new FileNotFoundException("property file '" + propFileName + "' not found."); + } + appProperties = prop; + + } catch (Exception e) { + System.out.println("Exception: " + e); + throw e; + } finally { + if (inputStream != null) + inputStream.close(); + } + return appProperties; + } +} \ No newline at end of file diff --git a/examples/TxProducer/src/main/resources/config.properties b/examples/TxProducer/src/main/resources/config.properties new file mode 100644 index 0000000..f3ff21a --- /dev/null +++ b/examples/TxProducer/src/main/resources/config.properties @@ -0,0 +1,33 @@ +#OKafka Producer example properties + +#Properties to connect to Oracle Database +#Option 1: Connect to Oracle database using plaintext +#bootstrap.servers= +#oracle.service.name= +#oracle.net.tns_admin= +bootstrap.servers=84.235.172.160:1521 +oracle.service.name="freepdb1" +oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/examples/TxProducer/src/main/resources + + +#Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet +#security.protocol=SSL +#oracle.net.tns_admin= +#tns.alias= + +#Appliction specific OKafka Producer properties +#topic.name= +topic.name=TXEQ + +batch.size=200 +linger.ms=100 +buffer.memory=335544 + +enable.idempotence=true +key.serializer=org.apache.kafka.common.serialization.StringSerializer +value.serializer=org.apache.kafka.common.serialization.StringSerializer + +# Property to create a Transactional Producer +oracle.transactional.producer=true + + diff --git a/examples/TxProducer/src/main/resources/ojdbc.properties b/examples/TxProducer/src/main/resources/ojdbc.properties new file mode 100644 index 0000000..6a7530d --- /dev/null +++ b/examples/TxProducer/src/main/resources/ojdbc.properties @@ -0,0 +1,2 @@ +user=okafka_user +password=Wr_nBa_5dfgqxSpa \ No newline at end of file diff --git a/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java b/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java index bdd75b0..1838cdb 100644 --- a/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java +++ b/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java @@ -14,7 +14,8 @@ import java.time.Duration; import java.util.Arrays; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.oracle.okafka.clients.consumer.KafkaConsumer; + import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -39,14 +40,14 @@ public static void main(String[] args) { String topic = appProperties.getProperty("topic.name", "TXEQ"); appProperties.remove("topic.name"); // Pass props to build OKafkaProducer - KafkaConsumer consumer = new KafkaConsumer(appProperties); + KafkaConsumer consumer = new KafkaConsumer<>(appProperties); consumer.subscribe(Arrays.asList(topic)); while(true) { try { - ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); + ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) System.out.printf("partition = %d, offset = %d, key = %d, value =%s\n ", record.partition(), record.offset(), record.key(), record.value()); if(records != null && records.count() > 0) { diff --git a/examples/consumer/src/main/resources/config.properties b/examples/consumer/src/main/resources/config.properties index e45f2dd..caecb56 100644 --- a/examples/consumer/src/main/resources/config.properties +++ b/examples/consumer/src/main/resources/config.properties @@ -2,9 +2,14 @@ #Properties to connect to Oracle Database #Option 1: Connect to Oracle database using plaintext -bootstrap.servers= -oracle.service.name= -oracle.net.tns_admin= +#bootstrap.servers= +#oracle.service.name= +#oracle.net.tns_admin= +security.protocol="PLAINTEXT" +bootstrap.servers=84.235.172.160:1521 +oracle.service.name="freepdb1" +oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/examples/consumer/src/main/resources + #Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet #security.protocol=SSL @@ -12,12 +17,15 @@ oracle.net.tns_admin= #tns.alias= # Application specific OKafka consumer properties -topic.name= -group.id= +#topic.name= +topic.name=TXEQ +#group.id= +group.id=TXEQ_SUBSCRIBER enable.auto.commit=true max.poll.records=1000 +default.api.timeout.ms=180000 -key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer +key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer diff --git a/examples/consumer/src/main/resources/ojdbc.properties b/examples/consumer/src/main/resources/ojdbc.properties index 14f7246..6a7530d 100644 --- a/examples/consumer/src/main/resources/ojdbc.properties +++ b/examples/consumer/src/main/resources/ojdbc.properties @@ -1,2 +1,2 @@ -user= -password= \ No newline at end of file +user=okafka_user +password=Wr_nBa_5dfgqxSpa \ No newline at end of file diff --git a/examples/producer/src/main/resources/config.properties b/examples/producer/src/main/resources/config.properties index 0fbbbb2..3c42b16 100644 --- a/examples/producer/src/main/resources/config.properties +++ b/examples/producer/src/main/resources/config.properties @@ -2,9 +2,12 @@ #Properties to connect to Oracle Database #Option 1: Connect to Oracle database using plaintext -bootstrap.servers= -oracle.service.name= -oracle.net.tns_admin= +#bootstrap.servers= +#oracle.service.name= +#oracle.net.tns_admin= +bootstrap.servers=84.235.172.160:1521 +oracle.service.name="freepdb1" +oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/examples/producer/src/main/resources #Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet @@ -13,6 +16,9 @@ oracle.net.tns_admin= #tns.alias= #Appliction specific OKafka Producer properties +#topic.name= +topic.name=TXEQ + batch.size=200 linger.ms=100 buffer.memory=335544 @@ -21,4 +27,3 @@ enable.idempotence=true key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer -topic.name= \ No newline at end of file diff --git a/examples/producer/src/main/resources/ojdbc.properties b/examples/producer/src/main/resources/ojdbc.properties index 14f7246..6a7530d 100644 --- a/examples/producer/src/main/resources/ojdbc.properties +++ b/examples/producer/src/main/resources/ojdbc.properties @@ -1,2 +1,2 @@ -user= -password= \ No newline at end of file +user=okafka_user +password=Wr_nBa_5dfgqxSpa \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 767073f..9b27c3d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -6,4 +6,7 @@ */ rootProject.name = 'okafka' -include(':clients', 'examples:consumer', 'examples:producer') \ No newline at end of file +include(':clients', 'examples:consumer', 'examples:producer', 'examples:TxProducer', 'examples:TxConsumer') +// include 'examples:TxProducer' +// findProject(':examples:TxProducer')?.name = 'TxProducer' + From 201cd781d7d46549f8982b199ca40bcff2070b78 Mon Sep 17 00:00:00 2001 From: Paulo Alberto Simoes Date: Mon, 5 Aug 2024 22:47:34 -0300 Subject: [PATCH 4/7] wip examples 23.4 cleanup credentials --- .gitignore | 1 + examples/TxProducer/src/main/resources/ojdbc.properties | 2 -- examples/consumer/src/main/resources/ojdbc.properties | 2 -- examples/{TxConsumer/src/main/resources => }/ojdbc.properties | 0 examples/producer/src/main/resources/config.properties | 2 +- examples/producer/src/main/resources/ojdbc.properties | 2 -- 6 files changed, 2 insertions(+), 7 deletions(-) delete mode 100644 examples/TxProducer/src/main/resources/ojdbc.properties delete mode 100644 examples/consumer/src/main/resources/ojdbc.properties rename examples/{TxConsumer/src/main/resources => }/ojdbc.properties (100%) delete mode 100644 examples/producer/src/main/resources/ojdbc.properties diff --git a/.gitignore b/.gitignore index 2c413a4..6f25fdb 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,7 @@ clients/.project clients/.settings/ examples/.project examples/.settings/ +examples/ojdbc.properties examples/consumer/.classpath examples/consumer/.project examples/consumer/.settings/ diff --git a/examples/TxProducer/src/main/resources/ojdbc.properties b/examples/TxProducer/src/main/resources/ojdbc.properties deleted file mode 100644 index 6a7530d..0000000 --- a/examples/TxProducer/src/main/resources/ojdbc.properties +++ /dev/null @@ -1,2 +0,0 @@ -user=okafka_user -password=Wr_nBa_5dfgqxSpa \ No newline at end of file diff --git a/examples/consumer/src/main/resources/ojdbc.properties b/examples/consumer/src/main/resources/ojdbc.properties deleted file mode 100644 index 6a7530d..0000000 --- a/examples/consumer/src/main/resources/ojdbc.properties +++ /dev/null @@ -1,2 +0,0 @@ -user=okafka_user -password=Wr_nBa_5dfgqxSpa \ No newline at end of file diff --git a/examples/TxConsumer/src/main/resources/ojdbc.properties b/examples/ojdbc.properties similarity index 100% rename from examples/TxConsumer/src/main/resources/ojdbc.properties rename to examples/ojdbc.properties diff --git a/examples/producer/src/main/resources/config.properties b/examples/producer/src/main/resources/config.properties index 3c42b16..2aa6958 100644 --- a/examples/producer/src/main/resources/config.properties +++ b/examples/producer/src/main/resources/config.properties @@ -7,7 +7,7 @@ #oracle.net.tns_admin= bootstrap.servers=84.235.172.160:1521 oracle.service.name="freepdb1" -oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/examples/producer/src/main/resources +oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/examples #Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet diff --git a/examples/producer/src/main/resources/ojdbc.properties b/examples/producer/src/main/resources/ojdbc.properties deleted file mode 100644 index 6a7530d..0000000 --- a/examples/producer/src/main/resources/ojdbc.properties +++ /dev/null @@ -1,2 +0,0 @@ -user=okafka_user -password=Wr_nBa_5dfgqxSpa \ No newline at end of file From 6070f197bbfa67a0a21bd5219876856ebd631cd6 Mon Sep 17 00:00:00 2001 From: Paulo Alberto Simoes Date: Mon, 5 Aug 2024 22:50:11 -0300 Subject: [PATCH 5/7] wip examples 23.4 cleanup credentials --- examples/TxConsumer/src/main/resources/config.properties | 2 +- examples/TxProducer/src/main/resources/config.properties | 2 +- examples/consumer/src/main/resources/config.properties | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/TxConsumer/src/main/resources/config.properties b/examples/TxConsumer/src/main/resources/config.properties index e7a9253..0a1f8e7 100644 --- a/examples/TxConsumer/src/main/resources/config.properties +++ b/examples/TxConsumer/src/main/resources/config.properties @@ -8,7 +8,7 @@ security.protocol=PLAINTEXT bootstrap.servers=84.235.172.160:1521 oracle.service.name="freepdb1" -oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/examples/TxConsumer/src/main/resources +oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/examples #Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet diff --git a/examples/TxProducer/src/main/resources/config.properties b/examples/TxProducer/src/main/resources/config.properties index f3ff21a..60b38c1 100644 --- a/examples/TxProducer/src/main/resources/config.properties +++ b/examples/TxProducer/src/main/resources/config.properties @@ -7,7 +7,7 @@ #oracle.net.tns_admin= bootstrap.servers=84.235.172.160:1521 oracle.service.name="freepdb1" -oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/examples/TxProducer/src/main/resources +oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/examples #Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet diff --git a/examples/consumer/src/main/resources/config.properties b/examples/consumer/src/main/resources/config.properties index caecb56..debc8ae 100644 --- a/examples/consumer/src/main/resources/config.properties +++ b/examples/consumer/src/main/resources/config.properties @@ -8,7 +8,7 @@ security.protocol="PLAINTEXT" bootstrap.servers=84.235.172.160:1521 oracle.service.name="freepdb1" -oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/examples/consumer/src/main/resources +oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/examples #Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet From bef51d558a47909802c180f53c28ed0f894faa67 Mon Sep 17 00:00:00 2001 From: Paulo Alberto Simoes Date: Mon, 12 Aug 2024 20:14:05 -0300 Subject: [PATCH 6/7] wip examples 23.4 fixed consumer --- README.md | 6 +++-- .../okafka/examples/ConsumerOKafka.java | 27 ++++++++++--------- .../src/main/resources/config.properties | 1 - .../okafka/examples/ProducerOKafka.java | 1 + 4 files changed, 19 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 31c3def..b47d34b 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,8 @@ To run `OKafka application` against Oracle Database, a database user must be cre ```roomsql create user identified by -GRANT CONNECT , RESOURCE to user; +GRANT AQ_USER_ROLE to user; +GRANT CONNECT, RESOURCE, unlimited tablespace to user; GRANT EXECUTE on DBMS_AQ to user; GRANT EXECUTE on DBMS_AQADM to user; GRANT EXECUTE on DBMS_AQIN to user; @@ -33,7 +34,8 @@ GRANT SELECT on GV_$INSTANCE to user; GRANT SELECT on GV_$LISTENER_NETWORK to user; GRANT SELECT on GV_$PDBS to user; GRANT SELECT on USER_QUEUE_PARTITION_ASSIGNMENT_TABLE to user; -exec DBMS_AQADM.GRANT_PRIV_FOR_RM_PLAN('user'); +GRANT SELECT on SYS.DBA_RSRC_PLAN_DIRECTIVES to user; +EXEC DBMS_AQADM.GRANT_PRIV_FOR_RM_PLAN('user'); ``` Once user is created and above privileges are granted, connect to Oracle Database as this user and create a Transactional Event Queue using below PL/SQL script. One can also use `KafkaAdmin` interface as shown in `CreateTopic.java` in `examples` directory to create a Transactional Event Queue. diff --git a/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java b/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java index 1838cdb..7c2704a 100644 --- a/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java +++ b/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java @@ -21,7 +21,7 @@ public class ConsumerOKafka { public static void main(String[] args) { - System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "INFO"); // Get application properties Properties appProperties = null; @@ -43,20 +43,21 @@ public static void main(String[] args) { KafkaConsumer consumer = new KafkaConsumer<>(appProperties); consumer.subscribe(Arrays.asList(topic)); - while(true) { + try { - ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); + while(true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); - for (ConsumerRecord record : records) - System.out.printf("partition = %d, offset = %d, key = %d, value =%s\n ", record.partition(), record.offset(), record.key(), record.value()); + for (ConsumerRecord record : records) + System.out.printf("partition = %d, offset = %d, key = %s, value =%s\n ", record.partition(), record.offset(), record.key(), record.value()); - if(records != null && records.count() > 0) { - System.out.println("Committing records" + records.count()); - consumer.commitSync(); - } - else { - System.out.println("No Record Fetched. Retrying in 1 second"); - Thread.sleep(1000); + if (records != null && records.count() > 0) { + System.out.println("Committing records" + records.count()); + consumer.commitSync(); + } else { + System.out.println("No Record Fetched. Retrying in 1 second"); + Thread.sleep(1000); + } } }catch(Exception e) { @@ -66,7 +67,7 @@ public static void main(String[] args) { finally { consumer.close(); } - } + } private static java.util.Properties getProperties() throws IOException { diff --git a/examples/consumer/src/main/resources/config.properties b/examples/consumer/src/main/resources/config.properties index debc8ae..fe836cd 100644 --- a/examples/consumer/src/main/resources/config.properties +++ b/examples/consumer/src/main/resources/config.properties @@ -5,7 +5,6 @@ #bootstrap.servers= #oracle.service.name= #oracle.net.tns_admin= -security.protocol="PLAINTEXT" bootstrap.servers=84.235.172.160:1521 oracle.service.name="freepdb1" oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/examples diff --git a/examples/producer/src/main/java/org/oracle/okafka/examples/ProducerOKafka.java b/examples/producer/src/main/java/org/oracle/okafka/examples/ProducerOKafka.java index 1cc464f..7e45867 100644 --- a/examples/producer/src/main/java/org/oracle/okafka/examples/ProducerOKafka.java +++ b/examples/producer/src/main/java/org/oracle/okafka/examples/ProducerOKafka.java @@ -8,6 +8,7 @@ package org.oracle.okafka.examples; import org.oracle.okafka.clients.producer.KafkaProducer; + import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; From a0b5fbaaf132db5b3726f17a33ef852da5a8fc59 Mon Sep 17 00:00:00 2001 From: Paulo Alberto Simoes Date: Wed, 14 Aug 2024 23:53:31 -0300 Subject: [PATCH 7/7] Cleanup Examples and test configurations. --- README.md | 26 +-- build.gradle | 79 ------- .../examples/TransactionalConsumerOKafka.java | 172 -------------- .../src/main/resources/config.properties | 36 --- .../examples/TransactionalProducerOKafka.java | 139 ----------- .../src/main/resources/config.properties | 33 --- .../oracle/okafka/examples/CreateTopic.java | 62 ----- .../oracle/okafka/examples/DeleteTopic.java | 58 ----- .../okafka/examples/SimpleConsumer.java | 61 ----- .../okafka/examples/SimpleProducer.java | 88 ------- .../examples/TransactionalConsumer.java | 155 ------------ .../examples/TransactionalProduceConsume.java | 221 ------------------ .../examples/TransactionalProducer.java | 116 --------- .../src/main/resources/config.properties | 11 - .../okafka/examples/ConsumerOKafka.java | 2 +- .../src/main/resources/config.properties | 17 +- examples/ojdbc.properties | 4 +- .../src/main/resources/config.properties | 14 +- settings.gradle | 4 +- 19 files changed, 20 insertions(+), 1278 deletions(-) delete mode 100644 examples/TxConsumer/src/main/java/org/oracle/okafka/examples/TransactionalConsumerOKafka.java delete mode 100644 examples/TxConsumer/src/main/resources/config.properties delete mode 100644 examples/TxProducer/src/main/java/org/oracle/okafka/examples/TransactionalProducerOKafka.java delete mode 100644 examples/TxProducer/src/main/resources/config.properties delete mode 100644 examples/common/src/main/java/org/oracle/okafka/examples/CreateTopic.java delete mode 100644 examples/common/src/main/java/org/oracle/okafka/examples/DeleteTopic.java delete mode 100644 examples/common/src/main/java/org/oracle/okafka/examples/SimpleConsumer.java delete mode 100644 examples/common/src/main/java/org/oracle/okafka/examples/SimpleProducer.java delete mode 100644 examples/common/src/main/java/org/oracle/okafka/examples/TransactionalConsumer.java delete mode 100644 examples/common/src/main/java/org/oracle/okafka/examples/TransactionalProduceConsume.java delete mode 100644 examples/common/src/main/java/org/oracle/okafka/examples/TransactionalProducer.java delete mode 100644 examples/common/src/main/resources/config.properties diff --git a/README.md b/README.md index b47d34b..d102ab5 100644 --- a/README.md +++ b/README.md @@ -128,29 +128,13 @@ gradle javadoc ## Examples -Repository contains 7 common OKafka application examples in `examples` folder. +Repository contains 2 common OKafka application examples in `examples` folder. -`1. CreateTopic.java` - -Connects to Oracle Database and create a topic TXEQ with 10 partitions with default retention time of 7 days. +`1. ProducerOKafka.java` +Produces 10 messages into TxEQ topic. -`2. DeleteTopic.java` -Deletes already created OKafka topic. - -`3. SimpleProducer.java` -Produces 100 messages into TxEQ topic. - -`4. SimpleConsumer.java` -Consumes 100 messages from TxEQ topic. - -`5. TransactionalProducer.java` -Retrieves the Oracle Database Connection object from OKafka producer. Atomically performs a DML operation and sends a record. - -`6. TransactionalConsumer.java` -Retrieves the Oracle Database Connection object from OKafka consumer. Atomically consumes a batch of records and perform DML operation for each record. - - -`7. TransactionalProduceConsume.java` -Atomically consumes a batch of messages from TXEQ topic using OKafka Consumer, processes records from the batch and produces them in the tpic TxEQ_2 using OKafka Producer. To achieve atomicity for this these consume-process-produce operations, application retrieves the Oracle Database Connection object from OKafka Consumer and pass it to create an OKafka Producer. +`2. ConsumerOKafka.java` +Consumes 10 messages from TxEQ topic. ## Kafka Java Client APIs supported diff --git a/build.gradle b/build.gradle index 9f910e8..18cea7c 100644 --- a/build.gradle +++ b/build.gradle @@ -203,85 +203,6 @@ project(':examples:producer') { } } -project(':examples:TxProducer') { - apply plugin : 'java' - apply plugin : 'application' - - sourceCompatibility = minJavaVersion - targetCompatibility = minJavaVersion - - dependencies { - // These dependencies are used by the application. - implementation project(':clients') - implementation group: 'com.oracle.database.security', name: 'oraclepki', version: '23.4.0.24.05' - implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.1' - implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.5.6' - } - - tasks.named('jar') { - description('Generates OKafka Transactional Producer jar ') - archiveBaseName = 'okafka' - archiveClassifier = 'txproducer' - - from "${rootProject.projectDir}/LICENSE.txt" - from "${rootProject.projectDir}/NOTICE" - - manifest { - attributes( - 'Implementation-Title' : 'OKafka Transactional Producer', - 'Implementation-Version': project.version - ) - } - } - - tasks.named('run') { - description('Run OKafka Transactional Producer') - application { - mainClass = 'org.oracle.okafka.examples.TransactionalProducerOKafka' - } - } -} - -project(':examples:TxConsumer') { - apply plugin : 'java' - apply plugin : 'application' - - sourceCompatibility = minJavaVersion - targetCompatibility = minJavaVersion - - dependencies { - // These dependencies are used by the application. - implementation project(':clients') - implementation group: 'com.oracle.database.security', name: 'oraclepki', version: '23.4.0.24.05' - implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.1' - implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.5.6' - } - - - tasks.named('jar') { - description('Generates OKafka Transactional Consumer jar ') - archiveBaseName = 'okafka' - archiveClassifier = 'txconsumer' - - from "${rootProject.projectDir}/LICENSE.txt" - from "${rootProject.projectDir}/NOTICE" - - manifest { - attributes( - 'Implementation-Title' : 'OKafka Transactional Consumer', - 'Implementation-Version': project.version - ) - } - } - - tasks.named('run') { - description('Run OKafka Transactional Consumer') - application { - mainClass = 'org.oracle.okafka.examples.TransactionalConsumerOKafka' - } - } -} - configurations { childJar } diff --git a/examples/TxConsumer/src/main/java/org/oracle/okafka/examples/TransactionalConsumerOKafka.java b/examples/TxConsumer/src/main/java/org/oracle/okafka/examples/TransactionalConsumerOKafka.java deleted file mode 100644 index efd0d9e..0000000 --- a/examples/TxConsumer/src/main/java/org/oracle/okafka/examples/TransactionalConsumerOKafka.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - ** OKafka Java Client version 23.4. - ** - ** Copyright (c) 2019, 2024 Oracle and/or its affiliates. - ** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. - */ - -package org.oracle.okafka.examples; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.util.Properties; -import java.sql.Connection; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import org.oracle.okafka.clients.consumer.KafkaConsumer; - - -public class TransactionalConsumerOKafka { - - // Dummy implementation of ConsumerRebalanceListener interface - // It only maintains the list of assigned partitions in assignedPartitions list - static class ConsumerRebalance implements ConsumerRebalanceListener { - - public List assignedPartitions = new ArrayList(); - - @Override - public synchronized void onPartitionsAssigned(Collection partitions) { - System.out.println("Newly Assigned Partitions:"); - for (TopicPartition tp :partitions ) { - System.out.println(tp); - assignedPartitions.add(tp); - } - } - - @Override - public synchronized void onPartitionsRevoked(Collection partitions) { - System.out.println("Revoked previously assigned partitions. "); - for (TopicPartition tp :assignedPartitions ) { - System.out.println(tp); - } - assignedPartitions.clear(); - } - } - - public static void main(String[] args) { - System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); - - // Get application properties - Properties appProperties = null; - try { - appProperties = getProperties(); - if (appProperties == null) { - System.out.println("Application properties not found!"); - System.exit(-1); - } - } catch (Exception e) { - System.out.println("Application properties not found!"); - System.out.println("Exception: " + e); - System.exit(-1); - } - - String topic = appProperties.getProperty("topic.name", "TXEQ"); - appProperties.remove("topic.name"); // Pass props to build OKafkaProducer - - Consumer consumer = new KafkaConsumer<>(appProperties); - ConsumerRebalanceListener rebalanceListener = new ConsumerRebalance(); - - consumer.subscribe(Arrays.asList(topic), rebalanceListener); - - int expectedMsgCnt = 100; - int msgCnt = 0; - Connection conn = null; - boolean fail = false; - try { - while(true) { - try { - //Consumes records from the assigned partitions of 'TXEQ' topic - ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); - - if (records.count() > 0 ) - { - conn = ((KafkaConsumer)consumer).getDBConnection(); - fail = false; - for (ConsumerRecord record : records) - { - System.out.printf("partition = %d, offset = %d, key = %s, value =%s\n ", record.partition(), record.offset(), record.key(), record.value()); - for(Header h: record.headers()) - { - System.out.println("Header: " +h.toString()); - } - try { - processRecord(conn, record); - } catch(Exception e) { - fail = true; - break; - } - } - if(fail){ - conn.rollback(); - } - else { - msgCnt += records.count(); - consumer.commitSync(); - } - - if(msgCnt >= (expectedMsgCnt )) { - System.out.println("Received " + msgCnt + " Expected " + expectedMsgCnt +". Exiting Now."); - break; - } - } - else { - System.out.println("No Record Fetched. Retrying in 1 second"); - Thread.sleep(1000); - } - }catch(Exception e) - { - System.out.println("Exception while consuming messages: " + e.getMessage()); - throw e; - } - } - }catch(Exception e) - { - System.out.println("Exception from OKafka consumer " + e); - e.printStackTrace(); - }finally { - System.out.println("Closing OKafka Consumer. Received "+ msgCnt +" records."); - consumer.close(); - } - } - - private static void processRecord(Connection conn, ConsumerRecord record) - { - //Application specific logic to process the message - } - - private static java.util.Properties getProperties() throws IOException { - InputStream inputStream = null; - Properties appProperties = null; - - try { - Properties prop = new Properties(); - String propFileName = "config.properties"; - inputStream = TransactionalConsumerOKafka.class.getClassLoader().getResourceAsStream(propFileName); - if (inputStream != null) { - prop.load(inputStream); - } else { - throw new FileNotFoundException("property file '" + propFileName + "' not found."); - } - appProperties = prop; - - } catch (Exception e) { - System.out.println("Exception: " + e); - throw e; - } finally { - inputStream.close(); - } - return appProperties; - } -} \ No newline at end of file diff --git a/examples/TxConsumer/src/main/resources/config.properties b/examples/TxConsumer/src/main/resources/config.properties deleted file mode 100644 index 0a1f8e7..0000000 --- a/examples/TxConsumer/src/main/resources/config.properties +++ /dev/null @@ -1,36 +0,0 @@ -# okafka consumer example properties - -#Properties to connect to Oracle Database -#Option 1: Connect to Oracle database using plaintext -#bootstrap.servers= -#oracle.service.name= -#oracle.net.tns_admin= -security.protocol=PLAINTEXT -bootstrap.servers=84.235.172.160:1521 -oracle.service.name="freepdb1" -oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/examples - - -#Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet -#security.protocol=SSL -#oracle.net.tns_admin= -#tns.alias= - -# Application specific OKafka consumer properties -#topic.name= -topic.name=TXEQ -#group.id= -group.id=CG1 - -enable.auto.commit=false -# Maximum number of records fetched in single poll call -max.poll.records=10 - -key.deserializer=org.apache.kafka.common.serialization.StringDeserializer -value.deserializer=org.apache.kafka.common.serialization.StringDeserializer - - - - - - diff --git a/examples/TxProducer/src/main/java/org/oracle/okafka/examples/TransactionalProducerOKafka.java b/examples/TxProducer/src/main/java/org/oracle/okafka/examples/TransactionalProducerOKafka.java deleted file mode 100644 index 980490c..0000000 --- a/examples/TxProducer/src/main/java/org/oracle/okafka/examples/TransactionalProducerOKafka.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - ** OKafka Java Client version 23.4. - ** - ** Copyright (c) 2019, 2024 Oracle and/or its affiliates. - ** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. - */ - -package org.oracle.okafka.examples; - -import org.oracle.okafka.clients.producer.KafkaProducer; - -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.errors.DisconnectException; -import org.apache.kafka.common.header.internals.RecordHeader; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.sql.Connection; -import java.util.Properties; - -public class TransactionalProducerOKafka { - public static void main(String[] args) { - System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); - - // Get application properties - Properties appProperties = null; - try { - appProperties = getProperties(); - if (appProperties == null) { - System.out.println("Application properties not found!"); - System.exit(-1); - } - } catch (Exception e) { - System.out.println("Application properties not found!"); - System.out.println("Exception: " + e); - System.exit(-1); - } - - Producer producer = null; - - try { - String topic = appProperties.getProperty("topic.name", "TXEQ"); - appProperties.remove("topic.name"); // Pass props to build OKafkaProducer - - producer = new KafkaProducer<>(appProperties); - - int msgCnt = 100; - String jsonPayload = "{\"name\":\"Programmer"+msgCnt+"\",\"status\":\"classy\",\"catagory\":\"general\",\"region\":\"north\",\"title\":\"programmer\"}"; - System.out.println(jsonPayload); - producer.initTransactions(); - - Connection conn = ((KafkaProducer )producer).getDBConnection(); - - // Produce 100 records in a transaction and commit. - try { - producer.beginTransaction(); - boolean fail = false; - for( int i=0;i producerRecord = - new ProducerRecord(topic, i+"", jsonPayload); - producerRecord.headers().add(rH1).add(rH2); - try { - processRecord(conn, producerRecord); - } catch(Exception e) { - //Retry processRecord or abort the Okafka transaction and close the producer - fail = true; - break; - } - producer.send(producerRecord); - } - - if(fail) // Failed to process the records. Abort Okafka transaction - producer.abortTransaction(); - else // Successfully process all the records. Commit OKafka transaction - producer.commitTransaction(); - - System.out.println("Produced 100 messages."); - }catch( DisconnectException dcE) { - producer.close(); - }catch (KafkaException e) { - producer.abortTransaction(); - } - } - catch(Exception e) - { - System.out.println("Exception in Main " + e ); - e.printStackTrace(); - } - finally { - try { - if(producer != null) - producer.close(); - }catch(Exception e) - { - System.out.println("Exception while closing producer " + e); - e.printStackTrace(); - - } - System.out.println("Producer Closed"); - } - } - - private static void processRecord(Connection conn, ProducerRecord record) throws Exception - { - //Application specific logic - } - - private static java.util.Properties getProperties() throws IOException { - InputStream inputStream = null; - Properties appProperties; - - try { - Properties prop = new Properties(); - String propFileName = "config.properties"; - inputStream = TransactionalProducerOKafka.class.getClassLoader().getResourceAsStream(propFileName); - if (inputStream != null) { - prop.load(inputStream); - } else { - throw new FileNotFoundException("property file '" + propFileName + "' not found."); - } - appProperties = prop; - - } catch (Exception e) { - System.out.println("Exception: " + e); - throw e; - } finally { - if (inputStream != null) - inputStream.close(); - } - return appProperties; - } -} \ No newline at end of file diff --git a/examples/TxProducer/src/main/resources/config.properties b/examples/TxProducer/src/main/resources/config.properties deleted file mode 100644 index 60b38c1..0000000 --- a/examples/TxProducer/src/main/resources/config.properties +++ /dev/null @@ -1,33 +0,0 @@ -#OKafka Producer example properties - -#Properties to connect to Oracle Database -#Option 1: Connect to Oracle database using plaintext -#bootstrap.servers= -#oracle.service.name= -#oracle.net.tns_admin= -bootstrap.servers=84.235.172.160:1521 -oracle.service.name="freepdb1" -oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/examples - - -#Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet -#security.protocol=SSL -#oracle.net.tns_admin= -#tns.alias= - -#Appliction specific OKafka Producer properties -#topic.name= -topic.name=TXEQ - -batch.size=200 -linger.ms=100 -buffer.memory=335544 - -enable.idempotence=true -key.serializer=org.apache.kafka.common.serialization.StringSerializer -value.serializer=org.apache.kafka.common.serialization.StringSerializer - -# Property to create a Transactional Producer -oracle.transactional.producer=true - - diff --git a/examples/common/src/main/java/org/oracle/okafka/examples/CreateTopic.java b/examples/common/src/main/java/org/oracle/okafka/examples/CreateTopic.java deleted file mode 100644 index 0031496..0000000 --- a/examples/common/src/main/java/org/oracle/okafka/examples/CreateTopic.java +++ /dev/null @@ -1,62 +0,0 @@ -/* -** OKafka Java Client version 23.4. -** -** Copyright (c) 2019, 2024 Oracle and/or its affiliates. -** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. -*/ - -package org.oracle.okafka.examples; - -import java.util.Arrays; -import java.util.Properties; -import java.util.concurrent.ExecutionException; - -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.CreateTopicsResult; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.common.KafkaFuture; - -import org.oracle.okafka.clients.admin.AdminClient; - -public class SimpleAdminOKafka { - - public static void main(String[] args) { - Properties props = new Properties(); - - //IP or Host name where Oracle Database 23c is running and Database Listener's Port - props.put("bootstrap.servers", "localhost:1521"); - //name of the service running on the database instance - props.put("oracle.service.name", "FREEPDB1"); - props.put("security.protocol","PLAINTEXT"); - // location for ojdbc.properties file where user and password properties are saved - props.put("oracle.net.tns_admin","."); - - /* - //Option 2: Connect to Oracle Autonomous Database using Oracle Wallet - //This option to be used when connecting to Oracle autonomous database instance on OracleCloud - props.put("security.protocol","SSL"); - // location for Oracle Wallet, tnsnames.ora file and ojdbc.properties file - props.put("oracle.net.tns_admin","."); - props.put("tns.alias","Oracle23ai_high"); - */ - - try (Admin admin = AdminClient.create(props)) { - //Create Topic named TXEQ and TXEQ_2 with 10 Partitions. - CreateTopicsResult result = admin.createTopics( - Arrays.asList(new NewTopic("TXEQ", 10, (short)0), new NewTopic("TXEQ_2", 10, (short)0))); - try { - KafkaFuture ftr = result.all(); - ftr.get(); - } catch ( InterruptedException | ExecutionException e ) { - - throw new IllegalStateException(e); - } - System.out.println("Closing OKafka admin now"); - } - catch(Exception e) - { - System.out.println("Exception while creating topic " + e); - e.printStackTrace(); - } - } -} diff --git a/examples/common/src/main/java/org/oracle/okafka/examples/DeleteTopic.java b/examples/common/src/main/java/org/oracle/okafka/examples/DeleteTopic.java deleted file mode 100644 index 2f8fecc..0000000 --- a/examples/common/src/main/java/org/oracle/okafka/examples/DeleteTopic.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - ** OKafka Java Client version 23.4. - ** - ** Copyright (c) 2019, 2024 Oracle and/or its affiliates. - ** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. - */ - -package org.oracle.okafka.examples; - -import java.util.Collections; -import java.util.Properties; -import java.util.concurrent.ExecutionException; - -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.CreateTopicsResult; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.common.KafkaFuture; -import org.oracle.okafka.clients.admin.AdminClient; -import org.oracle.okafka.clients.admin.DeleteTopicsResult; -import org.oracle.okafka.clients.admin.KafkaAdminClient; - -public class OKafkaDeleteTopic { - - public static void main(String[] args) { - Properties props = new Properties(); - //IP or Host name where Oracle Database 23c is running and Database Listener's Port - props.put("bootstrap.servers", "localhost:1521"); - //name of the service running on the database instance - props.put("oracle.service.name", "FREEPDB1"); - props.put("security.protocol","PLAINTEXT"); - // location for ojdbc.properties file where user and password properties are saved - props.put("oracle.net.tns_admin","."); - - /* - //Option 2: Connect to Oracle Autonomous Database using Oracle Wallet - //This option to be used when connecting to Oracle autonomous database instance on OracleCloud - props.put("security.protocol","SSL"); - // location for Oracle Wallet, tnsnames.ora file and ojdbc.properties file - props.put("oracle.net.tns_admin","."); - props.put("tns.alias","Oracle23ai_high"); - */ - try (Admin admin = AdminClient.create(props)) { - - org.apache.kafka.clients.admin.DeleteTopicsResult delResult = admin.deleteTopics(Collections.singletonList("TXEQ")); - - //DeleteTopicsResult delResult = kAdminClient.deleteTopics(Collections.singletonList("TEQ2"), new org.oracle.okafka.clients.admin.DeleteTopicsOptions()); - - Thread.sleep(5000); - System.out.println("Auto Clsoing admin now"); - } - catch(Exception e) - { - System.out.println("Exception while creating topic " + e); - e.printStackTrace(); - } - } - -} diff --git a/examples/common/src/main/java/org/oracle/okafka/examples/SimpleConsumer.java b/examples/common/src/main/java/org/oracle/okafka/examples/SimpleConsumer.java deleted file mode 100644 index 66a636f..0000000 --- a/examples/common/src/main/java/org/oracle/okafka/examples/SimpleConsumer.java +++ /dev/null @@ -1,61 +0,0 @@ -/* -** OKafka Java Client version 23.4. -** -** Copyright (c) 2019, 2024 Oracle and/or its affiliates. -** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. -*/ - -package org.oracle.okafka.examples; - -import java.util.Properties; -import java.time.Duration; -import java.util.Arrays; - -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -public class SimpleConsumer { - public static void main(String[] args) { - Properties props = new Properties(); - props.put("bootstrap.servers", "den02tgo.us.oracle.com:9092"); - props.put("group.id" , "test-group"); - props.put("enable.auto.commit","true"); - props.put("max.poll.records", 1000); - - props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); - props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - - - KafkaConsumer consumer = new KafkaConsumer(props); - consumer.subscribe(Arrays.asList("quickstart-events")); - - while(true) { - try { - ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); - - for (ConsumerRecord record : records) - System.out.printf("partition = %d, offset = %d, key = %d, value =%s\n ", record.partition(), record.offset(), record.key(), record.value()); - - if(records != null && records.count() > 0) { - System.out.println("Committing records" + records.count()); - consumer.commitSync(); - } - else { - System.out.println("No Record Fetched. Waiting for User input"); - System.in.read(); - } - }catch(Exception e) - { - System.out.println("Exception from consumer " + e); - e.printStackTrace(); - } - finally { - consumer.close(); - } - } - } - -} \ No newline at end of file diff --git a/examples/common/src/main/java/org/oracle/okafka/examples/SimpleProducer.java b/examples/common/src/main/java/org/oracle/okafka/examples/SimpleProducer.java deleted file mode 100644 index cf17f18..0000000 --- a/examples/common/src/main/java/org/oracle/okafka/examples/SimpleProducer.java +++ /dev/null @@ -1,88 +0,0 @@ -/* -** OKafka Java Client version 23.4. -** -** Copyright (c) 2019, 2024 Oracle and/or its affiliates. -** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. -*/ - -package org.oracle.okafka.examples; - -import org.oracle.okafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; - -import java.util.ArrayList; -import java.util.Properties; -import java.util.concurrent.Future; - -public class SimpleProducerOKafka { - - public static void main(String[] args) { - long startTime =0; - - try { - Properties props = new Properties(); - - // Option 1: Connect to Oracle Database with database username and password - props.put("security.protocol","PLAINTEXT"); - //IP or Host name where Oracle Database 23ai is running and Database Listener's Port - props.put("bootstrap.servers", "localhost:1521"); - props.put("oracle.service.name", "freepdb1"); //name of the service running on the database instance - // location for ojdbc.properties file where user and password properties are saved - props.put("oracle.net.tns_admin","."); - - /* - //Option 2: Connect to Oracle Autonomous Database using Oracle Wallet - //This option to be used when connecting to Oracle autonomous database instance on OracleCloud - props.put("security.protocol","SSL"); - // location for Oracle Wallet, tnsnames.ora file and ojdbc.properties file - props.put("oracle.net.tns_admin","."); - props.put("tns.alias","Oracle23ai_high"); - */ - - props.put("enable.idempotence","true"); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - - String baseMsg = "This is test with 128 characters Payload used to"+ - " compare result with Apache Kafka with Oracle"+ - " Transactional Event Queue........."; - - Producer producer = new KafkaProducer(props); - Future lastFuture = null; - int msgCnt = 3000; - startTime = System.currentTimeMillis(); - //String key = "KafkaTestKeyWithDifferntCharacters12@$%^&"; - String key = "Just some value for kafka"; - ArrayList> metadataList = new ArrayList>(); - - for(int i=0;i producerRecord = - new ProducerRecord("KTOPIC2", key+i, baseMsg + i); - //producerRecord.headers().add(rH1).add(rH2); - lastFuture = producer.send(producerRecord); - metadataList.add(lastFuture); - /*if(i%10 == 0) - { - Thread.sleep(1000); - }*/ - } - RecordMetadata rd = lastFuture.get(); - System.out.println("Last record placed in " + rd.partition() + " Offset " + rd.offset()); - - Thread.sleep(5000); - System.out.println("Initiating close"); - producer.close(); - long runTime = System.currentTimeMillis() - startTime; - System.out.println("Produced "+ msgCnt +" messages. Run duration " + runTime); - } - catch(Exception e) - { - System.out.println("Exception in Main " + e ); - e.printStackTrace(); - } - } -} diff --git a/examples/common/src/main/java/org/oracle/okafka/examples/TransactionalConsumer.java b/examples/common/src/main/java/org/oracle/okafka/examples/TransactionalConsumer.java deleted file mode 100644 index dd432b1..0000000 --- a/examples/common/src/main/java/org/oracle/okafka/examples/TransactionalConsumer.java +++ /dev/null @@ -1,155 +0,0 @@ -/* -** OKafka Java Client version 23.4. -** -** Copyright (c) 2019, 2024 Oracle and/or its affiliates. -** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. -*/ - -package org.oracle.okafka.examples; - -import java.util.Properties; -import java.sql.Connection; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import org.oracle.okafka.clients.consumer.KafkaConsumer; - - -public class TransactionalConsumerOKafka { - - // Dummy implementation of ConsumerRebalanceListener interface - // It only maintains the list of assigned partitions in assignedPartitions list - static class ConsumerRebalance implements ConsumerRebalanceListener { - - public List assignedPartitions = new ArrayList(); - - @Override - public synchronized void onPartitionsAssigned(Collection partitions) { - System.out.println("Newly Assigned Partitions:"); - for (TopicPartition tp :partitions ) { - System.out.println(tp); - assignedPartitions.add(tp); - } - } - - @Override - public synchronized void onPartitionsRevoked(Collection partitions) { - System.out.println("Revoked previously assigned partitions. "); - for (TopicPartition tp :assignedPartitions ) { - System.out.println(tp); - } - assignedPartitions.clear(); - } - } - - public static void main(String[] args) { - Properties props = new Properties(); - - // Option 1: Connect to Oracle Database with database username and password - props.put("security.protocol","PLAINTEXT"); - //IP or Host name where Oracle Database 23ai is running and Database Listener's Port - props.put("bootstrap.servers", "localhost:1521"); - props.put("oracle.service.name", "freepdb1"); //name of the service running on the database instance - // location for ojdbc.properties file where user and password properties are saved - props.put("oracle.net.tns_admin","."); - - /* - //Option 2: Connect to Oracle Autonomous Database using Oracle Wallet - //This option to be used when connecting to Oracle autonomous database instance on OracleCloud - props.put("security.protocol","SSL"); - // location for Oracle Wallet, tnsnames.ora file and ojdbc.properties file - props.put("oracle.net.tns_admin","."); - props.put("tns.alias","Oracle23ai_high"); - */ - - //Consumer Group Name - props.put("group.id" , "CG1"); - props.put("enable.auto.commit","false"); - - // Maximum number of records fetched in single poll call - props.put("max.poll.records", 10); - - props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - - Consumer consumer = new KafkaConsumer(props); - ConsumerRebalanceListener rebalanceListener = new ConsumerRebalance(); - - consumer.subscribe(Arrays.asList("TXEQ"), rebalanceListener); - - int expectedMsgCnt = 100; - int msgCnt = 0; - Connection conn = null; - boolean fail = false; - try { - while(true) { - try { - //Consumes records from the assigned partitions of 'TXEQ' topic - ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); - - if (records.count() > 0 ) - { - conn = ((KafkaConsumer)consumer).getDBConnection(); - fail = false; - for (ConsumerRecord record : records) - { - System.out.printf("partition = %d, offset = %d, key = %s, value =%s\n ", record.partition(), record.offset(), record.key(), record.value()); - for(Header h: record.headers()) - { - System.out.println("Header: " +h.toString()); - } - try { - processRecord(conn, record); - } catch(Exception e) { - fail = true; - break; - } - } - if(fail){ - conn.rollback(); - } - else { - msgCnt += records.count(); - consumer.commitSync(); - } - - if(msgCnt >= (expectedMsgCnt )) { - System.out.println("Received " + msgCnt + " Expected " + expectedMsgCnt +". Exiting Now."); - break; - } - } - else { - System.out.println("No Record Fetched. Retrying in 1 second"); - Thread.sleep(1000); - } - }catch(Exception e) - { - System.out.println("Exception while consuming messages: " + e.getMessage()); - throw e; - } - } - }catch(Exception e) - { - System.out.println("Exception from OKafka consumer " + e); - e.printStackTrace(); - }finally { - System.out.println("Closing OKafka Consumer. Received "+ msgCnt +" records."); - consumer.close(); - } - } - - private static void processRecord(Connection conn, ConsumerRecord record) - { - //Application specific logic to process the message - } -} diff --git a/examples/common/src/main/java/org/oracle/okafka/examples/TransactionalProduceConsume.java b/examples/common/src/main/java/org/oracle/okafka/examples/TransactionalProduceConsume.java deleted file mode 100644 index 83ad05a..0000000 --- a/examples/common/src/main/java/org/oracle/okafka/examples/TransactionalProduceConsume.java +++ /dev/null @@ -1,221 +0,0 @@ -/* -** OKafka Java Client version 23.4. -** -** Copyright (c) 2019, 2024 Oracle and/or its affiliates. -** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. -*/ - -package org.oracle.okafka.examples; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.Future; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.oracle.okafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.DisconnectException; -import org.apache.kafka.common.header.Header; -import org.oracle.okafka.clients.consumer.KafkaConsumer; - -public class TransactionalConsumerProducer { - - static int msgNo =0; - static PreparedStatement instCStmt = null; - static PreparedStatement instPStmt = null; - - public static void main(String[] args) { - Properties commonProps = new Properties(); - Properties cProps = new Properties(); - Properties pProps =new Properties(); - - // Option 1: Connect to Oracle Database with database username and password - commonProps.put("security.protocol","PLAINTEXT"); - //IP or Host name where Oracle Database 23ai is running and Database Listener's Port - commonProps.put("bootstrap.servers", "localhost:1521"); - commonProps.put("oracle.service.name", "freepdb1"); //name of the service running on the database instance - // directory location where ojdbc.properties file is stored which contains user and password properties - commonProps.put("oracle.net.tns_admin","."); - - /* - //Option 2: Connect to Oracle Autonomous Database using Oracle Wallet - //This option to be used when connecting to Oracle autonomous database instance on OracleCloud - commonProps.put("security.protocol","SSL"); - // location for Oracle Wallet, tnsnames.ora file and ojdbc.properties file - commonProps.put("oracle.net.tns_admin","."); - commonProps.put("tns.alias","Oracle23ai_high"); - */ - - cProps.putAll(commonProps); - pProps.putAll(commonProps); - - //Consumer Group Name - cProps.put("group.id" , "CG1"); - cProps.put("enable.auto.commit","false"); - - // Maximum number of records fetched in single poll call - cProps.put("max.poll.records", 10); - cProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - cProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - - pProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - pProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - pProps.put("oracle.transactional.producer", "true"); - - Consumer consumer = new KafkaConsumer(cProps); - ConsumerRebalanceListener rebalanceListener = new ConsumerRebalance(); - consumer.subscribe(Arrays.asList("TXEQ"), rebalanceListener); - - int expectedMsgCnt = 100; - int msgCnt = 0; - Connection conn = null; - - Producer producer = null; - try { - conn = ((KafkaConsumer)consumer).getDBConnection(); - producer = new KafkaProducer(pProps, conn); - producer.initTransactions(); - while(true) { - try { - //Consumes records from the assigned partitions of 'TXEQ' topic - ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); - if(records != null && records.count() > 0) { - msgCnt += records.count(); - - producer.beginTransaction(); - boolean fail =false; - for (ConsumerRecord record : records) { - ProducerRecord pr = null; - try { - String outRecord = processConsumerRecord(conn, record); - pr = new ProducerRecord("TXEQ_2", record.key(), outRecord); - processProducerRecord(conn, pr); - }catch(Exception e) - { - // Stop processing of this batch - fail =true; - break; - } - producer.send(pr); - } - if(fail) { - //Abort consumed and produced records along with any DML operations done using connection object. - //Next consumer.poll will fetch the same records again. - producer.abortTransaction(); - } - else { - //Commit consumed and produced records along with any DML operations done using connection object - producer.commitTransaction(); - } - } - else { - System.out.println("No Record Fetched. Retrying in 1 second"); - Thread.sleep(1000); - } - - if(msgCnt >= expectedMsgCnt ) - { - System.out.println("Received " + msgCnt + " Expected " + expectedMsgCnt +". Exiting Now."); - break; - } - - }catch(DisconnectException dcE) { - System.out.println("Disconnect Exception while committing or aborting records "+ dcE); - throw dcE; - } - catch(KafkaException e) - { - System.out.println("Re-triable Exception while committing records "+ e); - producer.abortTransaction(); - } - catch(Exception e) - { - System.out.println("Exception while processing records " + e.getMessage()); - throw e; - } - } - }catch(Exception e) - { - System.out.println("Exception from OKafka consumer " + e); - e.printStackTrace(); - }finally { - - System.out.println("Closing OKafka Consumer. Received "+ msgCnt); - producer.close(); - consumer.close(); - } - } - - static String processConsumerRecord(Connection conn, ConsumerRecord record) throws Exception - { - //Application specific logic to process the record - System.out.println("Received: " + record.partition() +"," + record.offset() +":" + record.value()); - return record.value(); - } - static void processProducerRecord(Connection conn, ProducerRecord records) throws Exception - { - //Application specific logic to process the record - } - - static void processRecords(Producer porducer, Consumer consumer, ConsumerRecords records) throws Exception - { - Connection conn = ((KafkaProducer)porducer).getDBConnection(); - String jsonPayload = null; - ProducerRecord pr = null; - Future lastFuture = null; - for (ConsumerRecord record : records) - { - msgNo++; - System.out.println("Processing " + msgNo + " record.value() " + record.value()); - System.out.printf("partition = %d, offset = %d, key = %s, value =%s\n ", record.partition(), record.offset(), record.key(), record.value()); - for(Header h: record.headers()) - { - System.out.println("Header: " +h.toString()); - } - - jsonPayload = "{\"name\":\"Programmer"+msgNo+"\",\"status\":\"classy\",\"catagory\":\"general\",\"region\":\"north\",\"title\":\"programmer\"}"; - pr = new ProducerRecord("KTOPIC1", record.key(), jsonPayload); - lastFuture = porducer.send(pr); - RecordMetadata metadata = lastFuture.get(); - } - } - - // Dummy implementation of ConsumerRebalanceListener interface - // It only maintains the list of assigned partitions in assignedPartitions list - static class ConsumerRebalance implements ConsumerRebalanceListener { - - public List assignedPartitions = new ArrayList(); - - @Override - public synchronized void onPartitionsAssigned(Collection partitions) { - System.out.println("Newly Assigned Partitions:"); - for (TopicPartition tp :partitions ) { - System.out.println(tp); - assignedPartitions.add(tp); - } - } - - @Override - public synchronized void onPartitionsRevoked(Collection partitions) { - System.out.println("Revoked previously assigned partitions. "); - for (TopicPartition tp :assignedPartitions ) { - System.out.println(tp); - } - assignedPartitions.clear(); - } - } -} - diff --git a/examples/common/src/main/java/org/oracle/okafka/examples/TransactionalProducer.java b/examples/common/src/main/java/org/oracle/okafka/examples/TransactionalProducer.java deleted file mode 100644 index 8d427dc..0000000 --- a/examples/common/src/main/java/org/oracle/okafka/examples/TransactionalProducer.java +++ /dev/null @@ -1,116 +0,0 @@ -/* -** OKafka Java Client version 23.4. -** -** Copyright (c) 2019, 2024 Oracle and/or its affiliates. -** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. -*/ - -package org.oracle.okafka.examples; - -import org.oracle.okafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.errors.DisconnectException; -import org.apache.kafka.common.header.internals.RecordHeader; - -import java.sql.Connection; -import java.util.Properties; - -public class TransactionalProducerOKafka { - public static void main(String[] args) { - Producer producer = null; - try { - Properties props = new Properties(); - - // Option 1: Connect to Oracle Database with database username and password - props.put("security.protocol","PLAINTEXT"); - //IP or Host name where Oracle Database 23ai is running and Database Listener's Port - props.put("bootstrap.servers", "localhost:1521"); - props.put("oracle.service.name", "freepdb1"); //name of the service running on the database instance - // location for ojdbc.properties file where user and password properties are saved - props.put("oracle.net.tns_admin","."); - - /* - //Option 2: Connect to Oracle Autonomous Database using Oracle Wallet - //This option to be used when connecting to Oracle autonomous database instance on OracleCloud - props.put("security.protocol","SSL"); - // location for Oracle Wallet, tnsnames.ora file and ojdbc.properties file - props.put("oracle.net.tns_admin","."); - props.put("tns.alias","Oracle23ai_high"); - */ - - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - - //Property to create a Transactional Producer - props.put("oracle.transactional.producer", "true"); - - producer = new KafkaProducer(props); - - int msgCnt = 100; - String jsonPayload = "{\"name\":\"Programmer"+msgCnt+"\",\"status\":\"classy\",\"catagory\":\"general\",\"region\":\"north\",\"title\":\"programmer\"}"; - System.out.println(jsonPayload); - producer.initTransactions(); - - Connection conn = ((KafkaProducer )producer).getDBConnection(); - String topicName = "TXEQ"; - // Produce 100 records in a transaction and commit. - try { - producer.beginTransaction(); - boolean fail = false; - for( int i=0;i producerRecord = - new ProducerRecord(topicName, i+"", jsonPayload); - producerRecord.headers().add(rH1).add(rH2); - try { - processRecord(conn, producerRecord); - } catch(Exception e) { - //Retry processRecord or abort the Okafka transaction and close the producer - fail = true; - break; - } - producer.send(producerRecord); - } - - if(fail) // Failed to process the records. Abort Okafka transaction - producer.abortTransaction(); - else // Successfully process all the records. Commit OKafka transaction - producer.commitTransaction(); - - System.out.println("Produced 100 messages."); - }catch( DisconnectException dcE) { - producer.close(); - }catch (KafkaException e) { - producer.abortTransaction(); - } - } - catch(Exception e) - { - System.out.println("Exception in Main " + e ); - e.printStackTrace(); - } - finally { - try { - if(producer != null) - producer.close(); - }catch(Exception e) - { - System.out.println("Exception while closing producer " + e); - e.printStackTrace(); - - } - System.out.println("Producer Closed"); - } - } - - private static void processRecord(Connection conn, ProducerRecord record) throws Exception - { - //Application specific logic - } - -} \ No newline at end of file diff --git a/examples/common/src/main/resources/config.properties b/examples/common/src/main/resources/config.properties deleted file mode 100644 index 831832f..0000000 --- a/examples/common/src/main/resources/config.properties +++ /dev/null @@ -1,11 +0,0 @@ -#OKafka common properties for common examples - -#Properties to connect to Oracle Database -#Option 1: Connect to Oracle database using plaintext -bootstrap.servers= -oracle.service.name= -oracle.net.tns_admin= -#Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet -#security.protocol=SSL -#oracle.net.tns_admin= -#tns.alias= diff --git a/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java b/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java index 7c2704a..9c48bf2 100644 --- a/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java +++ b/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java @@ -21,7 +21,7 @@ public class ConsumerOKafka { public static void main(String[] args) { - System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "INFO"); + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); // Get application properties Properties appProperties = null; diff --git a/examples/consumer/src/main/resources/config.properties b/examples/consumer/src/main/resources/config.properties index fe836cd..65ae47b 100644 --- a/examples/consumer/src/main/resources/config.properties +++ b/examples/consumer/src/main/resources/config.properties @@ -1,13 +1,10 @@ -# okafka consumer example properties +# OKafka Consumer example properties #Properties to connect to Oracle Database #Option 1: Connect to Oracle database using plaintext -#bootstrap.servers= -#oracle.service.name= -#oracle.net.tns_admin= -bootstrap.servers=84.235.172.160:1521 -oracle.service.name="freepdb1" -oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/examples +bootstrap.servers= +oracle.service.name= +oracle.net.tns_admin= #Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet @@ -16,10 +13,8 @@ oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/example #tns.alias= # Application specific OKafka consumer properties -#topic.name= -topic.name=TXEQ -#group.id= -group.id=TXEQ_SUBSCRIBER +topic.name= +group.id= enable.auto.commit=true max.poll.records=1000 diff --git a/examples/ojdbc.properties b/examples/ojdbc.properties index 6a7530d..70bd7ff 100644 --- a/examples/ojdbc.properties +++ b/examples/ojdbc.properties @@ -1,2 +1,2 @@ -user=okafka_user -password=Wr_nBa_5dfgqxSpa \ No newline at end of file +user= +password= \ No newline at end of file diff --git a/examples/producer/src/main/resources/config.properties b/examples/producer/src/main/resources/config.properties index 2aa6958..3111efc 100644 --- a/examples/producer/src/main/resources/config.properties +++ b/examples/producer/src/main/resources/config.properties @@ -1,13 +1,10 @@ -#OKafka Producer example properties +# OKafka Producer example properties #Properties to connect to Oracle Database #Option 1: Connect to Oracle database using plaintext -#bootstrap.servers= -#oracle.service.name= -#oracle.net.tns_admin= -bootstrap.servers=84.235.172.160:1521 -oracle.service.name="freepdb1" -oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/examples +bootstrap.servers= +oracle.service.name= +oracle.net.tns_admin= #Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet @@ -16,8 +13,7 @@ oracle.net.tns_admin=/Users/pasimoes/Work/Oracle/Code/okafka/23.4/okafka/example #tns.alias= #Appliction specific OKafka Producer properties -#topic.name= -topic.name=TXEQ +topic.name= batch.size=200 linger.ms=100 diff --git a/settings.gradle b/settings.gradle index 9b27c3d..0d4ce19 100644 --- a/settings.gradle +++ b/settings.gradle @@ -6,7 +6,5 @@ */ rootProject.name = 'okafka' -include(':clients', 'examples:consumer', 'examples:producer', 'examples:TxProducer', 'examples:TxConsumer') -// include 'examples:TxProducer' -// findProject(':examples:TxProducer')?.name = 'TxProducer' +include(':clients', 'examples:consumer', 'examples:producer')