diff --git a/.gitignore b/.gitignore index 2c413a4..6f25fdb 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,7 @@ clients/.project clients/.settings/ examples/.project examples/.settings/ +examples/ojdbc.properties examples/consumer/.classpath examples/consumer/.project examples/consumer/.settings/ diff --git a/README.md b/README.md index 87018e8..7eee562 100644 --- a/README.md +++ b/README.md @@ -22,8 +22,9 @@ To run `OKafka application` against Oracle Database, a database user must be cre ```roomsql create user identified by -GRANT CONNECT , RESOURCE to user; -GRANT UNLIMITED TABLESPACE to user; + +GRANT AQ_USER_ROLE to user; +GRANT CONNECT, RESOURCE, unlimited tablespace to user; GRANT EXECUTE on DBMS_AQ to user; GRANT EXECUTE on DBMS_AQADM to user; GRANT EXECUTE on DBMS_AQIN to user; @@ -34,7 +35,8 @@ GRANT SELECT on GV_$INSTANCE to user; GRANT SELECT on GV_$LISTENER_NETWORK to user; GRANT SELECT on GV_$PDBS to user; GRANT SELECT on USER_QUEUE_PARTITION_ASSIGNMENT_TABLE to user; -exec DBMS_AQADM.GRANT_PRIV_FOR_RM_PLAN('user'); +GRANT SELECT on SYS.DBA_RSRC_PLAN_DIRECTIVES to user; +EXEC DBMS_AQADM.GRANT_PRIV_FOR_RM_PLAN('user'); ``` Note: @@ -134,29 +136,13 @@ gradle javadoc ## Examples -Repository contains 7 common OKafka application examples in `examples` folder. - -`1. CreateTopic.java` - -Connects to Oracle Database and create a topic TXEQ with 10 partitions with default retention time of 7 days. - -`2. DeleteTopic.java` -Deletes already created OKafka topic. - -`3. SimpleProducer.java` -Produces 100 messages into TxEQ topic. - -`4. SimpleConsumer.java` -Consumes 100 messages from TxEQ topic. - -`5. TransactionalProducer.java` -Retrieves the Oracle Database Connection object from OKafka producer. Atomically performs a DML operation and sends a record. - -`6. TransactionalConsumer.java` -Retrieves the Oracle Database Connection object from OKafka consumer. Atomically consumes a batch of records and perform DML operation for each record. +Repository contains 2 common OKafka application examples in `examples` folder. +`1. ProducerOKafka.java` +Produces 10 messages into TxEQ topic. -`7. TransactionalProduceConsume.java` -Atomically consumes a batch of messages from TXEQ topic using OKafka Consumer, processes records from the batch and produces them in the tpic TxEQ_2 using OKafka Producer. To achieve atomicity for this these consume-process-produce operations, application retrieves the Oracle Database Connection object from OKafka Consumer and pass it to create an OKafka Producer. +`2. ConsumerOKafka.java` +Consumes 10 messages from TxEQ topic. ## Kafka Java Client APIs supported diff --git a/build.gradle b/build.gradle index 1591b4f..18cea7c 100644 --- a/build.gradle +++ b/build.gradle @@ -28,12 +28,12 @@ allprojects { options.windowTitle = "Oracle Database Transactional Event Queues Java API Reference" options.header = """Oracle® Database Transactional Event Queues Java API Reference
23ai

FF46992-04
""" options.bottom = """
Copyright © 2001, 2024, Oracle and/or its affiliates. All rights reserved.


""" - } + } } ext { - gradleVersion = '7.3' - minJavaVersion = JavaVersion.VERSION_1_8 + gradleVersion = '8.8' + minJavaVersion = JavaVersion.VERSION_17 mavenUrl = project.hasProperty('mavenUrl') ? project.mavenUrl : '' mavenUsername = project.hasProperty('mavenUsername') ? project.mavenUsername : '' @@ -42,41 +42,41 @@ ext { project(':clients') { apply plugin : 'java-library' - + sourceCompatibility = minJavaVersion targetCompatibility = minJavaVersion - + sourceSets { - main { - java { - srcDir 'src/main/java' - exclude 'tests/**' - exclude 'test/**' - } - } - } + main { + java { + srcDir 'src/main/java' + exclude 'tests/**' + exclude 'test/**' + } + } + } - println 'Creating okafka-23.4.0.0.jar' + println 'Building okafka 23.4.0.0 Java API jar' dependencies { - // These dependencies are used by the application. - implementation group: 'com.oracle.database.jdbc', name: 'ojdbc11', version: '23.4.0.24.05' - implementation group: 'com.oracle.database.messaging', name: 'aqapi', version: '23.3.0.0' - implementation group: 'javax.transaction', name: 'jta', version: '1.1' - implementation group: 'javax.jms', name: 'javax.jms-api', version: '2.0' - implementation group: 'com.oracle.database.security', name: 'oraclepki', version: '23.4.0.24.05' - implementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.0-alpha0' - implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.1' - // Use JUnit test framework - implementation group: 'junit', name: 'junit', version: '4.12' - - // Test dependencies - testImplementation group: 'org.easymock', name: 'easymock', version: '3.6' - testImplementation group: 'org.powermock', name: 'powermock-module-junit4', version: '2.0.0-beta.5' - testImplementation group: 'org.powermock', name: 'powermock-api-support', version: '2.0.5' - testImplementation group: 'org.powermock', name: 'powermock-api-easymock', version: '2.0.0-beta.5' - + // These dependencies are used by the application. + implementation group: 'com.oracle.database.jdbc', name: 'ojdbc11', version: '23.4.0.24.05' + implementation group: 'com.oracle.database.messaging', name: 'aqapi', version: '23.3.0.0' + implementation group: 'javax.transaction', name: 'jta', version: '1.1' + implementation group: 'javax.jms', name: 'javax.jms-api', version: '2.0' + implementation group: 'com.oracle.database.security', name: 'oraclepki', version: '23.4.0.24.05' + implementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.0-alpha0' + implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.1' + // Use JUnit test framework + implementation group: 'junit', name: 'junit', version: '4.12' + + // Test dependencies + testImplementation group: 'org.easymock', name: 'easymock', version: '3.6' + testImplementation group: 'org.powermock', name: 'powermock-module-junit4', version: '2.0.0-beta.5' + testImplementation group: 'org.powermock', name: 'powermock-api-support', version: '2.0.5' + testImplementation group: 'org.powermock', name: 'powermock-api-easymock', version: '2.0.0-beta.5' + } javadoc { @@ -87,18 +87,21 @@ project(':clients') { } tasks.named('jar') { - description('Creating JAR okafka-23.4.0.0.jar ') + description('Generates okafka 23.4.0.0 API jar ') archiveBaseName = 'okafka' archiveVersion = '23.4.0.0' + from "${rootProject.projectDir}/LICENSE.txt" from "${rootProject.projectDir}/NOTICE" - manifest { - attributes ( - 'Version': '23.4.0.0', - 'Build-Time-ISO-8601':new Date().format("yyyy-MM-dd HH:mm:ss") - ) - } + manifest { + attributes ( + 'Implementation-Title' : 'okafka', + 'Implementation-Version': project.version, + 'Version': '23.4.0.0', + 'Build-Time-ISO-8601':new Date().format("yyyy-MM-dd HH:mm:ss") + ) + } } tasks.register('fullJar', Jar) { @@ -107,7 +110,7 @@ project(':clients') { manifest { attributes( 'Implementation-Title' : 'okafka', - 'Implementation-Version': project.version) + 'Implementation-Version': project.version) } from "${rootProject.projectDir}/LICENSE.txt" @@ -137,6 +140,7 @@ project(':examples:consumer') { implementation project(':clients') implementation group: 'com.oracle.database.security', name: 'oraclepki', version: '23.4.0.24.05' implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.1' + implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.5.6' } @@ -150,14 +154,14 @@ project(':examples:consumer') { manifest { attributes( 'Implementation-Title' : 'okafka consumer', - 'Implementation-Version': project.version) + 'Implementation-Version': project.version) } } tasks.named('run') { - description('Run okafka client consumer') + description('Run okafka client simple consumer') application { - mainClass = 'org.oracle.okafka.examples.Consumer' + mainClass = 'org.oracle.okafka.examples.ConsumerOKafka' } } } @@ -174,10 +178,11 @@ project(':examples:producer') { implementation project(':clients') implementation group: 'com.oracle.database.security', name: 'oraclepki', version: '23.4.0.24.05' implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.1' + implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.5.6' } tasks.named('jar') { - description('Generates okafka client producer jar ') + description('Generates okafka client simple producer jar ') archiveBaseName = 'okafka' archiveClassifier = 'producer' @@ -186,14 +191,14 @@ project(':examples:producer') { manifest { attributes( 'Implementation-Title' : 'okafka producer', - 'Implementation-Version': project.version) + 'Implementation-Version': project.version) } } tasks.named('run') { - description('Run okafka client producer') + description('Run okafka client simple producer') application { - mainClass = 'org.oracle.okafka.examples.Producer' + mainClass = 'org.oracle.okafka.examples.ProducerOKafka' } } } diff --git a/examples/common/src/main/java/org/oracle/okafka/examples/CreateTopic.java b/examples/common/src/main/java/org/oracle/okafka/examples/CreateTopic.java deleted file mode 100644 index 0031496..0000000 --- a/examples/common/src/main/java/org/oracle/okafka/examples/CreateTopic.java +++ /dev/null @@ -1,62 +0,0 @@ -/* -** OKafka Java Client version 23.4. -** -** Copyright (c) 2019, 2024 Oracle and/or its affiliates. -** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. -*/ - -package org.oracle.okafka.examples; - -import java.util.Arrays; -import java.util.Properties; -import java.util.concurrent.ExecutionException; - -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.CreateTopicsResult; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.common.KafkaFuture; - -import org.oracle.okafka.clients.admin.AdminClient; - -public class SimpleAdminOKafka { - - public static void main(String[] args) { - Properties props = new Properties(); - - //IP or Host name where Oracle Database 23c is running and Database Listener's Port - props.put("bootstrap.servers", "localhost:1521"); - //name of the service running on the database instance - props.put("oracle.service.name", "FREEPDB1"); - props.put("security.protocol","PLAINTEXT"); - // location for ojdbc.properties file where user and password properties are saved - props.put("oracle.net.tns_admin","."); - - /* - //Option 2: Connect to Oracle Autonomous Database using Oracle Wallet - //This option to be used when connecting to Oracle autonomous database instance on OracleCloud - props.put("security.protocol","SSL"); - // location for Oracle Wallet, tnsnames.ora file and ojdbc.properties file - props.put("oracle.net.tns_admin","."); - props.put("tns.alias","Oracle23ai_high"); - */ - - try (Admin admin = AdminClient.create(props)) { - //Create Topic named TXEQ and TXEQ_2 with 10 Partitions. - CreateTopicsResult result = admin.createTopics( - Arrays.asList(new NewTopic("TXEQ", 10, (short)0), new NewTopic("TXEQ_2", 10, (short)0))); - try { - KafkaFuture ftr = result.all(); - ftr.get(); - } catch ( InterruptedException | ExecutionException e ) { - - throw new IllegalStateException(e); - } - System.out.println("Closing OKafka admin now"); - } - catch(Exception e) - { - System.out.println("Exception while creating topic " + e); - e.printStackTrace(); - } - } -} diff --git a/examples/common/src/main/java/org/oracle/okafka/examples/DeleteTopic.java b/examples/common/src/main/java/org/oracle/okafka/examples/DeleteTopic.java deleted file mode 100644 index 2f8fecc..0000000 --- a/examples/common/src/main/java/org/oracle/okafka/examples/DeleteTopic.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - ** OKafka Java Client version 23.4. - ** - ** Copyright (c) 2019, 2024 Oracle and/or its affiliates. - ** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. - */ - -package org.oracle.okafka.examples; - -import java.util.Collections; -import java.util.Properties; -import java.util.concurrent.ExecutionException; - -import org.apache.kafka.clients.admin.Admin; -import org.apache.kafka.clients.admin.CreateTopicsResult; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.common.KafkaFuture; -import org.oracle.okafka.clients.admin.AdminClient; -import org.oracle.okafka.clients.admin.DeleteTopicsResult; -import org.oracle.okafka.clients.admin.KafkaAdminClient; - -public class OKafkaDeleteTopic { - - public static void main(String[] args) { - Properties props = new Properties(); - //IP or Host name where Oracle Database 23c is running and Database Listener's Port - props.put("bootstrap.servers", "localhost:1521"); - //name of the service running on the database instance - props.put("oracle.service.name", "FREEPDB1"); - props.put("security.protocol","PLAINTEXT"); - // location for ojdbc.properties file where user and password properties are saved - props.put("oracle.net.tns_admin","."); - - /* - //Option 2: Connect to Oracle Autonomous Database using Oracle Wallet - //This option to be used when connecting to Oracle autonomous database instance on OracleCloud - props.put("security.protocol","SSL"); - // location for Oracle Wallet, tnsnames.ora file and ojdbc.properties file - props.put("oracle.net.tns_admin","."); - props.put("tns.alias","Oracle23ai_high"); - */ - try (Admin admin = AdminClient.create(props)) { - - org.apache.kafka.clients.admin.DeleteTopicsResult delResult = admin.deleteTopics(Collections.singletonList("TXEQ")); - - //DeleteTopicsResult delResult = kAdminClient.deleteTopics(Collections.singletonList("TEQ2"), new org.oracle.okafka.clients.admin.DeleteTopicsOptions()); - - Thread.sleep(5000); - System.out.println("Auto Clsoing admin now"); - } - catch(Exception e) - { - System.out.println("Exception while creating topic " + e); - e.printStackTrace(); - } - } - -} diff --git a/examples/common/src/main/java/org/oracle/okafka/examples/SimpleConsumer.java b/examples/common/src/main/java/org/oracle/okafka/examples/SimpleConsumer.java deleted file mode 100644 index 66a636f..0000000 --- a/examples/common/src/main/java/org/oracle/okafka/examples/SimpleConsumer.java +++ /dev/null @@ -1,61 +0,0 @@ -/* -** OKafka Java Client version 23.4. -** -** Copyright (c) 2019, 2024 Oracle and/or its affiliates. -** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. -*/ - -package org.oracle.okafka.examples; - -import java.util.Properties; -import java.time.Duration; -import java.util.Arrays; - -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -public class SimpleConsumer { - public static void main(String[] args) { - Properties props = new Properties(); - props.put("bootstrap.servers", "den02tgo.us.oracle.com:9092"); - props.put("group.id" , "test-group"); - props.put("enable.auto.commit","true"); - props.put("max.poll.records", 1000); - - props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); - props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - - - KafkaConsumer consumer = new KafkaConsumer(props); - consumer.subscribe(Arrays.asList("quickstart-events")); - - while(true) { - try { - ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); - - for (ConsumerRecord record : records) - System.out.printf("partition = %d, offset = %d, key = %d, value =%s\n ", record.partition(), record.offset(), record.key(), record.value()); - - if(records != null && records.count() > 0) { - System.out.println("Committing records" + records.count()); - consumer.commitSync(); - } - else { - System.out.println("No Record Fetched. Waiting for User input"); - System.in.read(); - } - }catch(Exception e) - { - System.out.println("Exception from consumer " + e); - e.printStackTrace(); - } - finally { - consumer.close(); - } - } - } - -} \ No newline at end of file diff --git a/examples/common/src/main/java/org/oracle/okafka/examples/SimpleProducer.java b/examples/common/src/main/java/org/oracle/okafka/examples/SimpleProducer.java deleted file mode 100644 index cf17f18..0000000 --- a/examples/common/src/main/java/org/oracle/okafka/examples/SimpleProducer.java +++ /dev/null @@ -1,88 +0,0 @@ -/* -** OKafka Java Client version 23.4. -** -** Copyright (c) 2019, 2024 Oracle and/or its affiliates. -** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. -*/ - -package org.oracle.okafka.examples; - -import org.oracle.okafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; - -import java.util.ArrayList; -import java.util.Properties; -import java.util.concurrent.Future; - -public class SimpleProducerOKafka { - - public static void main(String[] args) { - long startTime =0; - - try { - Properties props = new Properties(); - - // Option 1: Connect to Oracle Database with database username and password - props.put("security.protocol","PLAINTEXT"); - //IP or Host name where Oracle Database 23ai is running and Database Listener's Port - props.put("bootstrap.servers", "localhost:1521"); - props.put("oracle.service.name", "freepdb1"); //name of the service running on the database instance - // location for ojdbc.properties file where user and password properties are saved - props.put("oracle.net.tns_admin","."); - - /* - //Option 2: Connect to Oracle Autonomous Database using Oracle Wallet - //This option to be used when connecting to Oracle autonomous database instance on OracleCloud - props.put("security.protocol","SSL"); - // location for Oracle Wallet, tnsnames.ora file and ojdbc.properties file - props.put("oracle.net.tns_admin","."); - props.put("tns.alias","Oracle23ai_high"); - */ - - props.put("enable.idempotence","true"); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - - String baseMsg = "This is test with 128 characters Payload used to"+ - " compare result with Apache Kafka with Oracle"+ - " Transactional Event Queue........."; - - Producer producer = new KafkaProducer(props); - Future lastFuture = null; - int msgCnt = 3000; - startTime = System.currentTimeMillis(); - //String key = "KafkaTestKeyWithDifferntCharacters12@$%^&"; - String key = "Just some value for kafka"; - ArrayList> metadataList = new ArrayList>(); - - for(int i=0;i producerRecord = - new ProducerRecord("KTOPIC2", key+i, baseMsg + i); - //producerRecord.headers().add(rH1).add(rH2); - lastFuture = producer.send(producerRecord); - metadataList.add(lastFuture); - /*if(i%10 == 0) - { - Thread.sleep(1000); - }*/ - } - RecordMetadata rd = lastFuture.get(); - System.out.println("Last record placed in " + rd.partition() + " Offset " + rd.offset()); - - Thread.sleep(5000); - System.out.println("Initiating close"); - producer.close(); - long runTime = System.currentTimeMillis() - startTime; - System.out.println("Produced "+ msgCnt +" messages. Run duration " + runTime); - } - catch(Exception e) - { - System.out.println("Exception in Main " + e ); - e.printStackTrace(); - } - } -} diff --git a/examples/common/src/main/java/org/oracle/okafka/examples/TransactionalConsumer.java b/examples/common/src/main/java/org/oracle/okafka/examples/TransactionalConsumer.java deleted file mode 100644 index dd432b1..0000000 --- a/examples/common/src/main/java/org/oracle/okafka/examples/TransactionalConsumer.java +++ /dev/null @@ -1,155 +0,0 @@ -/* -** OKafka Java Client version 23.4. -** -** Copyright (c) 2019, 2024 Oracle and/or its affiliates. -** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. -*/ - -package org.oracle.okafka.examples; - -import java.util.Properties; -import java.sql.Connection; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import org.oracle.okafka.clients.consumer.KafkaConsumer; - - -public class TransactionalConsumerOKafka { - - // Dummy implementation of ConsumerRebalanceListener interface - // It only maintains the list of assigned partitions in assignedPartitions list - static class ConsumerRebalance implements ConsumerRebalanceListener { - - public List assignedPartitions = new ArrayList(); - - @Override - public synchronized void onPartitionsAssigned(Collection partitions) { - System.out.println("Newly Assigned Partitions:"); - for (TopicPartition tp :partitions ) { - System.out.println(tp); - assignedPartitions.add(tp); - } - } - - @Override - public synchronized void onPartitionsRevoked(Collection partitions) { - System.out.println("Revoked previously assigned partitions. "); - for (TopicPartition tp :assignedPartitions ) { - System.out.println(tp); - } - assignedPartitions.clear(); - } - } - - public static void main(String[] args) { - Properties props = new Properties(); - - // Option 1: Connect to Oracle Database with database username and password - props.put("security.protocol","PLAINTEXT"); - //IP or Host name where Oracle Database 23ai is running and Database Listener's Port - props.put("bootstrap.servers", "localhost:1521"); - props.put("oracle.service.name", "freepdb1"); //name of the service running on the database instance - // location for ojdbc.properties file where user and password properties are saved - props.put("oracle.net.tns_admin","."); - - /* - //Option 2: Connect to Oracle Autonomous Database using Oracle Wallet - //This option to be used when connecting to Oracle autonomous database instance on OracleCloud - props.put("security.protocol","SSL"); - // location for Oracle Wallet, tnsnames.ora file and ojdbc.properties file - props.put("oracle.net.tns_admin","."); - props.put("tns.alias","Oracle23ai_high"); - */ - - //Consumer Group Name - props.put("group.id" , "CG1"); - props.put("enable.auto.commit","false"); - - // Maximum number of records fetched in single poll call - props.put("max.poll.records", 10); - - props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - - Consumer consumer = new KafkaConsumer(props); - ConsumerRebalanceListener rebalanceListener = new ConsumerRebalance(); - - consumer.subscribe(Arrays.asList("TXEQ"), rebalanceListener); - - int expectedMsgCnt = 100; - int msgCnt = 0; - Connection conn = null; - boolean fail = false; - try { - while(true) { - try { - //Consumes records from the assigned partitions of 'TXEQ' topic - ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); - - if (records.count() > 0 ) - { - conn = ((KafkaConsumer)consumer).getDBConnection(); - fail = false; - for (ConsumerRecord record : records) - { - System.out.printf("partition = %d, offset = %d, key = %s, value =%s\n ", record.partition(), record.offset(), record.key(), record.value()); - for(Header h: record.headers()) - { - System.out.println("Header: " +h.toString()); - } - try { - processRecord(conn, record); - } catch(Exception e) { - fail = true; - break; - } - } - if(fail){ - conn.rollback(); - } - else { - msgCnt += records.count(); - consumer.commitSync(); - } - - if(msgCnt >= (expectedMsgCnt )) { - System.out.println("Received " + msgCnt + " Expected " + expectedMsgCnt +". Exiting Now."); - break; - } - } - else { - System.out.println("No Record Fetched. Retrying in 1 second"); - Thread.sleep(1000); - } - }catch(Exception e) - { - System.out.println("Exception while consuming messages: " + e.getMessage()); - throw e; - } - } - }catch(Exception e) - { - System.out.println("Exception from OKafka consumer " + e); - e.printStackTrace(); - }finally { - System.out.println("Closing OKafka Consumer. Received "+ msgCnt +" records."); - consumer.close(); - } - } - - private static void processRecord(Connection conn, ConsumerRecord record) - { - //Application specific logic to process the message - } -} diff --git a/examples/common/src/main/java/org/oracle/okafka/examples/TransactionalProduceConsume.java b/examples/common/src/main/java/org/oracle/okafka/examples/TransactionalProduceConsume.java deleted file mode 100644 index 83ad05a..0000000 --- a/examples/common/src/main/java/org/oracle/okafka/examples/TransactionalProduceConsume.java +++ /dev/null @@ -1,221 +0,0 @@ -/* -** OKafka Java Client version 23.4. -** -** Copyright (c) 2019, 2024 Oracle and/or its affiliates. -** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. -*/ - -package org.oracle.okafka.examples; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.Future; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.oracle.okafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.DisconnectException; -import org.apache.kafka.common.header.Header; -import org.oracle.okafka.clients.consumer.KafkaConsumer; - -public class TransactionalConsumerProducer { - - static int msgNo =0; - static PreparedStatement instCStmt = null; - static PreparedStatement instPStmt = null; - - public static void main(String[] args) { - Properties commonProps = new Properties(); - Properties cProps = new Properties(); - Properties pProps =new Properties(); - - // Option 1: Connect to Oracle Database with database username and password - commonProps.put("security.protocol","PLAINTEXT"); - //IP or Host name where Oracle Database 23ai is running and Database Listener's Port - commonProps.put("bootstrap.servers", "localhost:1521"); - commonProps.put("oracle.service.name", "freepdb1"); //name of the service running on the database instance - // directory location where ojdbc.properties file is stored which contains user and password properties - commonProps.put("oracle.net.tns_admin","."); - - /* - //Option 2: Connect to Oracle Autonomous Database using Oracle Wallet - //This option to be used when connecting to Oracle autonomous database instance on OracleCloud - commonProps.put("security.protocol","SSL"); - // location for Oracle Wallet, tnsnames.ora file and ojdbc.properties file - commonProps.put("oracle.net.tns_admin","."); - commonProps.put("tns.alias","Oracle23ai_high"); - */ - - cProps.putAll(commonProps); - pProps.putAll(commonProps); - - //Consumer Group Name - cProps.put("group.id" , "CG1"); - cProps.put("enable.auto.commit","false"); - - // Maximum number of records fetched in single poll call - cProps.put("max.poll.records", 10); - cProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - cProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - - pProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - pProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - pProps.put("oracle.transactional.producer", "true"); - - Consumer consumer = new KafkaConsumer(cProps); - ConsumerRebalanceListener rebalanceListener = new ConsumerRebalance(); - consumer.subscribe(Arrays.asList("TXEQ"), rebalanceListener); - - int expectedMsgCnt = 100; - int msgCnt = 0; - Connection conn = null; - - Producer producer = null; - try { - conn = ((KafkaConsumer)consumer).getDBConnection(); - producer = new KafkaProducer(pProps, conn); - producer.initTransactions(); - while(true) { - try { - //Consumes records from the assigned partitions of 'TXEQ' topic - ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); - if(records != null && records.count() > 0) { - msgCnt += records.count(); - - producer.beginTransaction(); - boolean fail =false; - for (ConsumerRecord record : records) { - ProducerRecord pr = null; - try { - String outRecord = processConsumerRecord(conn, record); - pr = new ProducerRecord("TXEQ_2", record.key(), outRecord); - processProducerRecord(conn, pr); - }catch(Exception e) - { - // Stop processing of this batch - fail =true; - break; - } - producer.send(pr); - } - if(fail) { - //Abort consumed and produced records along with any DML operations done using connection object. - //Next consumer.poll will fetch the same records again. - producer.abortTransaction(); - } - else { - //Commit consumed and produced records along with any DML operations done using connection object - producer.commitTransaction(); - } - } - else { - System.out.println("No Record Fetched. Retrying in 1 second"); - Thread.sleep(1000); - } - - if(msgCnt >= expectedMsgCnt ) - { - System.out.println("Received " + msgCnt + " Expected " + expectedMsgCnt +". Exiting Now."); - break; - } - - }catch(DisconnectException dcE) { - System.out.println("Disconnect Exception while committing or aborting records "+ dcE); - throw dcE; - } - catch(KafkaException e) - { - System.out.println("Re-triable Exception while committing records "+ e); - producer.abortTransaction(); - } - catch(Exception e) - { - System.out.println("Exception while processing records " + e.getMessage()); - throw e; - } - } - }catch(Exception e) - { - System.out.println("Exception from OKafka consumer " + e); - e.printStackTrace(); - }finally { - - System.out.println("Closing OKafka Consumer. Received "+ msgCnt); - producer.close(); - consumer.close(); - } - } - - static String processConsumerRecord(Connection conn, ConsumerRecord record) throws Exception - { - //Application specific logic to process the record - System.out.println("Received: " + record.partition() +"," + record.offset() +":" + record.value()); - return record.value(); - } - static void processProducerRecord(Connection conn, ProducerRecord records) throws Exception - { - //Application specific logic to process the record - } - - static void processRecords(Producer porducer, Consumer consumer, ConsumerRecords records) throws Exception - { - Connection conn = ((KafkaProducer)porducer).getDBConnection(); - String jsonPayload = null; - ProducerRecord pr = null; - Future lastFuture = null; - for (ConsumerRecord record : records) - { - msgNo++; - System.out.println("Processing " + msgNo + " record.value() " + record.value()); - System.out.printf("partition = %d, offset = %d, key = %s, value =%s\n ", record.partition(), record.offset(), record.key(), record.value()); - for(Header h: record.headers()) - { - System.out.println("Header: " +h.toString()); - } - - jsonPayload = "{\"name\":\"Programmer"+msgNo+"\",\"status\":\"classy\",\"catagory\":\"general\",\"region\":\"north\",\"title\":\"programmer\"}"; - pr = new ProducerRecord("KTOPIC1", record.key(), jsonPayload); - lastFuture = porducer.send(pr); - RecordMetadata metadata = lastFuture.get(); - } - } - - // Dummy implementation of ConsumerRebalanceListener interface - // It only maintains the list of assigned partitions in assignedPartitions list - static class ConsumerRebalance implements ConsumerRebalanceListener { - - public List assignedPartitions = new ArrayList(); - - @Override - public synchronized void onPartitionsAssigned(Collection partitions) { - System.out.println("Newly Assigned Partitions:"); - for (TopicPartition tp :partitions ) { - System.out.println(tp); - assignedPartitions.add(tp); - } - } - - @Override - public synchronized void onPartitionsRevoked(Collection partitions) { - System.out.println("Revoked previously assigned partitions. "); - for (TopicPartition tp :assignedPartitions ) { - System.out.println(tp); - } - assignedPartitions.clear(); - } - } -} - diff --git a/examples/common/src/main/java/org/oracle/okafka/examples/TransactionalProducer.java b/examples/common/src/main/java/org/oracle/okafka/examples/TransactionalProducer.java deleted file mode 100644 index 8d427dc..0000000 --- a/examples/common/src/main/java/org/oracle/okafka/examples/TransactionalProducer.java +++ /dev/null @@ -1,116 +0,0 @@ -/* -** OKafka Java Client version 23.4. -** -** Copyright (c) 2019, 2024 Oracle and/or its affiliates. -** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl. -*/ - -package org.oracle.okafka.examples; - -import org.oracle.okafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.errors.DisconnectException; -import org.apache.kafka.common.header.internals.RecordHeader; - -import java.sql.Connection; -import java.util.Properties; - -public class TransactionalProducerOKafka { - public static void main(String[] args) { - Producer producer = null; - try { - Properties props = new Properties(); - - // Option 1: Connect to Oracle Database with database username and password - props.put("security.protocol","PLAINTEXT"); - //IP or Host name where Oracle Database 23ai is running and Database Listener's Port - props.put("bootstrap.servers", "localhost:1521"); - props.put("oracle.service.name", "freepdb1"); //name of the service running on the database instance - // location for ojdbc.properties file where user and password properties are saved - props.put("oracle.net.tns_admin","."); - - /* - //Option 2: Connect to Oracle Autonomous Database using Oracle Wallet - //This option to be used when connecting to Oracle autonomous database instance on OracleCloud - props.put("security.protocol","SSL"); - // location for Oracle Wallet, tnsnames.ora file and ojdbc.properties file - props.put("oracle.net.tns_admin","."); - props.put("tns.alias","Oracle23ai_high"); - */ - - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - - //Property to create a Transactional Producer - props.put("oracle.transactional.producer", "true"); - - producer = new KafkaProducer(props); - - int msgCnt = 100; - String jsonPayload = "{\"name\":\"Programmer"+msgCnt+"\",\"status\":\"classy\",\"catagory\":\"general\",\"region\":\"north\",\"title\":\"programmer\"}"; - System.out.println(jsonPayload); - producer.initTransactions(); - - Connection conn = ((KafkaProducer )producer).getDBConnection(); - String topicName = "TXEQ"; - // Produce 100 records in a transaction and commit. - try { - producer.beginTransaction(); - boolean fail = false; - for( int i=0;i producerRecord = - new ProducerRecord(topicName, i+"", jsonPayload); - producerRecord.headers().add(rH1).add(rH2); - try { - processRecord(conn, producerRecord); - } catch(Exception e) { - //Retry processRecord or abort the Okafka transaction and close the producer - fail = true; - break; - } - producer.send(producerRecord); - } - - if(fail) // Failed to process the records. Abort Okafka transaction - producer.abortTransaction(); - else // Successfully process all the records. Commit OKafka transaction - producer.commitTransaction(); - - System.out.println("Produced 100 messages."); - }catch( DisconnectException dcE) { - producer.close(); - }catch (KafkaException e) { - producer.abortTransaction(); - } - } - catch(Exception e) - { - System.out.println("Exception in Main " + e ); - e.printStackTrace(); - } - finally { - try { - if(producer != null) - producer.close(); - }catch(Exception e) - { - System.out.println("Exception while closing producer " + e); - e.printStackTrace(); - - } - System.out.println("Producer Closed"); - } - } - - private static void processRecord(Connection conn, ProducerRecord record) throws Exception - { - //Application specific logic - } - -} \ No newline at end of file diff --git a/examples/common/src/main/resources/config.properties b/examples/common/src/main/resources/config.properties deleted file mode 100644 index 831832f..0000000 --- a/examples/common/src/main/resources/config.properties +++ /dev/null @@ -1,11 +0,0 @@ -#OKafka common properties for common examples - -#Properties to connect to Oracle Database -#Option 1: Connect to Oracle database using plaintext -bootstrap.servers= -oracle.service.name= -oracle.net.tns_admin= -#Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet -#security.protocol=SSL -#oracle.net.tns_admin= -#tns.alias= diff --git a/examples/consumer/pom.xml b/examples/consumer/pom.xml deleted file mode 100644 index 684d740..0000000 --- a/examples/consumer/pom.xml +++ /dev/null @@ -1,82 +0,0 @@ - - - - 4.0.0 - - org.oracle.okafka.examples - consumer - 23.4.0.0 - - consumer - https://github.com/oracle/okafka - - - UTF-8 - UTF-8 - ${java.version} - ${java.version} - 11 - 23.4.0.0 - - - - - com.oracle.database.messaging - okafka - 23.4.0.0 - - - junit - junit - 4.13.2 - test - - - - - - - - - maven-clean-plugin - 3.1.0 - - - - maven-resources-plugin - 3.0.2 - - - maven-compiler-plugin - 3.8.0 - - - maven-surefire-plugin - 2.22.1 - - - maven-jar-plugin - 3.0.2 - - - maven-install-plugin - 2.5.2 - - - maven-deploy-plugin - 2.8.2 - - - - maven-site-plugin - 3.7.1 - - - maven-project-info-reports-plugin - 3.0.0 - - - - - \ No newline at end of file diff --git a/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java b/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java index d2f91bb..2e4ac09 100644 --- a/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java +++ b/examples/consumer/src/main/java/org/oracle/okafka/examples/ConsumerOKafka.java @@ -7,39 +7,59 @@ package org.oracle.okafka.examples; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; import java.util.Properties; import java.time.Duration; import java.util.Arrays; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.common.TopicPartition; +import org.oracle.okafka.clients.consumer.KafkaConsumer; + import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.oracle.okafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; public class ConsumerOKafka { public static void main(String[] args) { - Properties props = new getProperties(); - String topic = props.getProperty("topic.name", "TXEQ"); - props.remove("topic.name"); // Pass props to build OKafkaConsumer - KafkaConsumer consumer = new KafkaConsumer(props); + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); + + // Get application properties + Properties appProperties = null; + try { + appProperties = getProperties(); + if (appProperties == null) { + System.out.println("Application properties not found!"); + System.exit(-1); + } + } catch (Exception e) { + System.out.println("Application properties not found!"); + System.out.println("Exception: " + e); + System.exit(-1); + } + + String topic = appProperties.getProperty("topic.name", "TXEQ"); + appProperties.remove("topic.name"); // Pass props to build OKafkaProducer + + KafkaConsumer consumer = new KafkaConsumer<>(appProperties); consumer.subscribe(Arrays.asList(topic)); - while(true) { + try { - ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); + while(true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); - for (ConsumerRecord record : records) - System.out.printf("partition = %d, offset = %d, key = %d, value =%s\n ", record.partition(), record.offset(), record.key(), record.value()); + for (ConsumerRecord record : records) + System.out.printf("partition = %d, offset = %d, key = %s, value =%s\n ", record.partition(), record.offset(), record.key(), record.value()); - if(records != null && records.count() > 0) { - System.out.println("Committing records" + records.count()); - consumer.commitSync(); - } - else { - System.out.println("No Record Fetched. Retrying in 1 second"); - Thread.sleep(1000); + if (records != null && records.count() > 0) { + System.out.println("Committing records" + records.count()); + consumer.commitSync(); + } else { + System.out.println("No Record Fetched. Retrying in 1 second"); + Thread.sleep(1000); + } } }catch(Exception e) { @@ -49,7 +69,7 @@ public static void main(String[] args) { finally { consumer.close(); } - } + } private static java.util.Properties getProperties() throws IOException { @@ -59,7 +79,7 @@ private static java.util.Properties getProperties() throws IOException { try { Properties prop = new Properties(); String propFileName = "config.properties"; - inputStream = new FileInPutStream(propFileName); + inputStream = ConsumerOKafka.class.getClassLoader().getResourceAsStream(propFileName); if (inputStream != null) { prop.load(inputStream); } else { diff --git a/examples/consumer/src/main/resources/config.properties b/examples/consumer/src/main/resources/config.properties index 5069223..65ae47b 100644 --- a/examples/consumer/src/main/resources/config.properties +++ b/examples/consumer/src/main/resources/config.properties @@ -1,22 +1,25 @@ -# okafka consumer example properties +# OKafka Consumer example properties #Properties to connect to Oracle Database #Option 1: Connect to Oracle database using plaintext bootstrap.servers= oracle.service.name= oracle.net.tns_admin= + + #Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet #security.protocol=SSL #oracle.net.tns_admin= #tns.alias= # Application specific OKafka consumer properties +topic.name= group.id= -enable.auto.commit=false -max.poll.records=100 +enable.auto.commit=true +max.poll.records=1000 +default.api.timeout.ms=180000 -key.deserializer=org.oracle.okafka.common.serialization.StringDeserializer -value.deserializer=org.oracle.okafka.common.serialization.StringDeserializer +key.deserializer=org.apache.kafka.common.serialization.StringDeserializer +value.deserializer=org.apache.kafka.common.serialization.StringDeserializer -topic.name= diff --git a/examples/ojdbc.properties b/examples/ojdbc.properties new file mode 100644 index 0000000..70bd7ff --- /dev/null +++ b/examples/ojdbc.properties @@ -0,0 +1,2 @@ +user= +password= \ No newline at end of file diff --git a/examples/producer/pom.xml b/examples/producer/pom.xml deleted file mode 100644 index 7f54b0c..0000000 --- a/examples/producer/pom.xml +++ /dev/null @@ -1,82 +0,0 @@ - - - - 4.0.0 - - org.oracle.okafka.examples - producer - 0.8 - - producer - https://github.com/oracle/okafka - - - UTF-8 - UTF-8 - ${java.version} - ${java.version} - 11 - 0.8 - - - - - org.oracle.okafka - okafka - 0.8 - - - junit - junit - 4.13.2 - test - - - - - - - - - - maven-clean-plugin - 3.1.0 - - - - maven-resources-plugin - 3.0.2 - - - maven-compiler-plugin - 3.8.0 - - - maven-surefire-plugin - 2.22.1 - - - maven-jar-plugin - 3.0.2 - - - maven-install-plugin - 2.5.2 - - - maven-deploy-plugin - 2.8.2 - - - - maven-site-plugin - 3.7.1 - - - maven-project-info-reports-plugin - 3.0.0 - - - - - \ No newline at end of file diff --git a/examples/producer/src/main/java/org/oracle/okafka/examples/ProducerOKafka.java b/examples/producer/src/main/java/org/oracle/okafka/examples/ProducerOKafka.java index 32a860c..7e45867 100644 --- a/examples/producer/src/main/java/org/oracle/okafka/examples/ProducerOKafka.java +++ b/examples/producer/src/main/java/org/oracle/okafka/examples/ProducerOKafka.java @@ -8,10 +8,15 @@ package org.oracle.okafka.examples; import org.oracle.okafka.clients.producer.KafkaProducer; + +import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Properties; import java.util.concurrent.Future; @@ -19,65 +24,82 @@ public class ProducerOKafka { public static void main(String[] args) { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); + + // Get application properties + Properties appProperties = null; + try { + appProperties = getProperties(); + if (appProperties == null) { + System.out.println("Application properties not found!"); + System.exit(-1); + } + } catch (Exception e) { + System.out.println("Application properties not found!"); + System.out.println("Exception: " + e); + System.exit(-1); + } + + String topic = appProperties.getProperty("topic.name", "TXEQ"); + appProperties.remove("topic.name"); // Pass props to build OKafkaProducer + + Producer producer = new KafkaProducer<>(appProperties); + + String baseMsg = "This is test with 128 characters Payload used to test Oracle Kafka. "+ + "Read https://github.com/oracle/okafka/blob/master/README.md"; + + Future lastFuture = null; + int msgCnt = 10; + String key = "Just some key for OKafka"; + ArrayList> metadataList = new ArrayList<>(); + try { - Properties props = new getProperties(); - String topic = props.getProperty("topic.name", "TXEQ"); - props.remove("topic.name"); // Pass props to build OKafkaProducer - Producer producer = new KafkaProducer(props); - - String baseMsg = "This is test with 128 characters Payload used to test Oracle Kafka. "+ - "Read https://github.com/oracle/okafka/blob/master/README.md"; - - - Future lastFuture = null; - int msgCnt = 10; - String key = "Just some key for OKafka"; - ArrayList> metadataList = new ArrayList>(); - for(int i=0;i producerRecord = - new ProducerRecord(topic, key+i, i+ baseMsg); + RecordHeader rH2 = new RecordHeader("REPLY_TO", "TXEQ_2".getBytes()); + ProducerRecord producerRecord = + new ProducerRecord<>(topic, key+i, i+ baseMsg); producerRecord.headers().add(rH1).add(rH2); lastFuture = producer.send(producerRecord); metadataList.add(lastFuture); } RecordMetadata rd = lastFuture.get(); System.out.println("Last record placed in " + rd.partition() + " Offset " + rd.offset()); - + } + catch(Exception e) { + System.out.println("Failed to send messages:"); + e.printStackTrace(); + } + finally { System.out.println("Initiating close"); producer.close(); - } - catch(Exception e) - { - System.out.println("Exception in Main " + e ); - e.printStackTrace(); } + + } + + private static java.util.Properties getProperties() throws IOException { + InputStream inputStream = null; + Properties appProperties; + + try { + Properties prop = new Properties(); + String propFileName = "config.properties"; + inputStream = ProducerOKafka.class.getClassLoader().getResourceAsStream(propFileName); + if (inputStream != null) { + prop.load(inputStream); + } else { + throw new FileNotFoundException("property file '" + propFileName + "' not found."); + } + appProperties = prop; + + } catch (Exception e) { + System.out.println("Exception: " + e); + throw e; + } finally { + if (inputStream != null) + inputStream.close(); + } + return appProperties; } - - private static java.util.Properties getProperties() throws IOException { - InputStream inputStream = null; - Properties appProperties = null; - - try { - Properties prop = new Properties(); - String propFileName = "config.properties"; - inputStream = new FileInPutStream(propFileName); - if (inputStream != null) { - prop.load(inputStream); - } else { - throw new FileNotFoundException("property file '" + propFileName + "' not found."); - } - appProperties = prop; - - } catch (Exception e) { - System.out.println("Exception: " + e); - throw e; - } finally { - inputStream.close(); - } - return appProperties; - } } diff --git a/examples/producer/src/main/resources/config.properties b/examples/producer/src/main/resources/config.properties index 4f015c2..3111efc 100644 --- a/examples/producer/src/main/resources/config.properties +++ b/examples/producer/src/main/resources/config.properties @@ -1,19 +1,25 @@ -#OKafka Producer example properties +# OKafka Producer example properties #Properties to connect to Oracle Database #Option 1: Connect to Oracle database using plaintext bootstrap.servers= oracle.service.name= oracle.net.tns_admin= + + #Option 2: Connect to Oracle Database deployed in Oracle Autonomous Cloud using Wallet #security.protocol=SSL #oracle.net.tns_admin= #tns.alias= #Appliction specific OKafka Producer properties +topic.name= batch.size=200 linger.ms=100 -key.serializer=org.oracle.okafka.common.serialization.StringSerializer -value.serializer=org.oracle.okafka.common.serialization.StringSerializer -topic.name= \ No newline at end of file +buffer.memory=335544 + +enable.idempotence=true +key.serializer=org.apache.kafka.common.serialization.StringSerializer +value.serializer=org.apache.kafka.common.serialization.StringSerializer + diff --git a/settings.gradle b/settings.gradle index 767073f..0d4ce19 100644 --- a/settings.gradle +++ b/settings.gradle @@ -6,4 +6,5 @@ */ rootProject.name = 'okafka' -include(':clients', 'examples:consumer', 'examples:producer') \ No newline at end of file +include(':clients', 'examples:consumer', 'examples:producer') +