Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modification in build.gradle and examples #51

Open
wants to merge 37 commits into
base: okafka-23.4.0.0
Choose a base branch
from
Open
Changes from 14 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
583c8af
changes in build.gradle and examples
s-ourabh Jul 23, 2024
fb2aaff
merged oracle/okafka-23.4.0.0 to s-ourabh/okafka-23.4.0.0
s-ourabh Jul 23, 2024
b428fd6
syncing the branch okafka-23.4.0.0 with the okafkaWithKafka3.7
s-ourabh Jul 23, 2024
4dfa81b
some changes in example and build.gradle
s-ourabh Jul 23, 2024
b790666
slight change in example
s-ourabh Jul 25, 2024
c27b6d7
Merge remote-tracking branch 'upstream/okafka-23.4.0.0' into okafka-2…
s-ourabh Aug 2, 2024
9d5b723
pull from okafka-23.4.0.0
s-ourabh Aug 2, 2024
b47a75f
Merge pull request #53 from oracle/okafka-23.4.0.0
pasimoes Aug 15, 2024
af16144
syncing with the master
s-ourabh Aug 21, 2024
b645ad1
Update information in Readme file.
linny0608 Aug 21, 2024
3bafb4f
owner instance and enqueue instance issue
s-ourabh Aug 26, 2024
bd94c78
only compile tests while build
s-ourabh Aug 26, 2024
3c89c5a
removed comment
s-ourabh Aug 26, 2024
d1f9ac0
Merge pull request #56 from s-ourabh/ownerInstanceIssue
ichokshi2109 Aug 26, 2024
a5dfae5
Cleanup stale entries from user_queue_partition_assignment_table.
Aug 26, 2024
b032597
Changes to use JDK 11
Aug 26, 2024
f1368f9
Merge remote-tracking branch 'origin/master'
Aug 26, 2024
b99fd2c
Add topic name format requirement.
pasimoes Aug 27, 2024
9605df5
Fix format of code block
pasimoes Aug 27, 2024
b267192
text format
pasimoes Aug 27, 2024
e02b11c
Changes to make TopicName case insensitive.
Aug 27, 2024
6fd7271
TopicName case insensitive for kafka admin
Aug 27, 2024
4a289af
Make topicname case-insensitive for KafkaAdmin
Aug 27, 2024
30d0215
Merge pull request #59 from oracle/CaseInsensitive
ichokshi2109 Aug 28, 2024
0ef791d
Merge pull request #54 from linny0608/modifyConnectorReadme
ichokshi2109 Aug 28, 2024
ab7a264
Update build.gradle
ichokshi2109 Sep 4, 2024
3d517aa
Merge pull request #60 from oracle/ichokshi2109-patch-1
ichokshi2109 Sep 4, 2024
ed15ea2
transactional producer serialized key and value null pointer exceptio…
s-ourabh Oct 3, 2024
b8f67a0
Merge pull request #62 from s-ourabh/transactional_producer_Nullpoint…
ichokshi2109 Oct 3, 2024
d73eac9
Merge branch 'master' into 57-docs-topic-name-uppercase
pasimoes Oct 4, 2024
421c478
sync with upstream
s-ourabh Oct 10, 2024
31b6956
Merge pull request #58 from oracle/57-docs-topic-name-uppercase
ichokshi2109 Oct 15, 2024
f5d3d03
Update README.md
ichokshi2109 Oct 16, 2024
624cb2d
Merge pull request #64 from oracle/ichokshi2109-patch-2
ichokshi2109 Oct 17, 2024
33033a4
conflict resolve
s-ourabh Oct 17, 2024
31ccea3
Merge remote-tracking branch 'upstream/master'
s-ourabh Oct 17, 2024
32716dd
Merge branch 'master' into okafkaWithKafka3.7
s-ourabh Oct 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -18,12 +18,12 @@ allprojects {
}

group = 'org.oracle.okafka'
version = '23.4.0.0'
version = '23.5.0.0'

tasks.withType(Javadoc) {
// disable the crazy super-strict doclint tool in Java 8
// noinspection SpellCheckingInspection
title ="Oracle Kafka 23.4.0.0 API"
title ="Oracle Kafka 23.5.0.0 API"
options.addStringOption('Xdoclint:none', '-quiet')
options.windowTitle = "Oracle Database Transactional Event Queues Java API Reference"
options.header = """<b>Oracle&reg; Database Transactional Event Queues Java API Reference<br>23ai</b><br>FF46992-04<br>"""
@@ -33,7 +33,7 @@ allprojects {

ext {
gradleVersion = '8.8'
minJavaVersion = JavaVersion.VERSION_17
minJavaVersion = JavaVersion.VERSION_11

mavenUrl = project.hasProperty('mavenUrl') ? project.mavenUrl : ''
mavenUsername = project.hasProperty('mavenUsername') ? project.mavenUsername : ''
@@ -54,7 +54,7 @@ project(':clients') {
}
}

println 'Building okafka 23.4.0.0 Java API jar'
println 'Building okafka 23.5.0.0 Java API jar'

dependencies {

@@ -65,16 +65,14 @@ project(':clients') {
implementation group: 'javax.jms', name: 'javax.jms-api', version: '2.0'
implementation group: 'com.oracle.database.security', name: 'oraclepki', version: '23.4.0.24.05'
implementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.0-alpha0'
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.1'
// Use JUnit test framework
implementation group: 'junit', name: 'junit', version: '4.12'

implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.7.1'

// Test dependencies
testImplementation group: 'org.easymock', name: 'easymock', version: '3.6'
testImplementation group: 'org.powermock', name: 'powermock-module-junit4', version: '2.0.0-beta.5'
testImplementation group: 'org.powermock', name: 'powermock-api-support', version: '2.0.5'
testImplementation group: 'org.powermock', name: 'powermock-api-easymock', version: '2.0.0-beta.5'

testImplementation group: 'junit', name: 'junit', version: '4.12'
}

javadoc {
@@ -85,9 +83,9 @@ project(':clients') {
}

tasks.named('jar') {
description('Generates okafka 23.4.0.0 API jar ')
description('Generates okafka 23.5.0.0 API jar ')
archiveBaseName = 'okafka'
archiveVersion = '23.4.0.0'
archiveVersion = '23.5.0.0'

from "${rootProject.projectDir}/LICENSE.txt"
from "${rootProject.projectDir}/NOTICE"
@@ -96,7 +94,7 @@ project(':clients') {
attributes (
'Implementation-Title' : 'okafka',
'Implementation-Version': project.version,
'Version': '23.4.0.0',
'Version': '23.5.0.0',
'Build-Time-ISO-8601':new Date().format("yyyy-MM-dd HH:mm:ss")
)
}
@@ -218,4 +216,4 @@ dependencies {
task multiProjectJar (type: Jar, dependsOn: configurations.childJar) {
description 'Generates a jar containing okafka client, all its dependencies and examples for okafka demo'
from { configurations.childJar.collect { zipTree(it) } }
}
}
Original file line number Diff line number Diff line change
@@ -1598,14 +1598,15 @@ private static boolean groupIdIsUnrepresentable(String groupId) {
public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics, final CreateTopicsOptions options) {
final Map<String, KafkaFutureImpl<TopicMetadataAndConfig>> topicFutures = new HashMap<>(newTopics.size());
final Map<String, CreateTopicsRequest.TopicDetails> topicsMap = new HashMap<>(newTopics.size());

for (NewTopic newTopic : newTopics) {
if (topicNameIsUnrepresentable(newTopic.name())) {
KafkaFutureImpl<TopicMetadataAndConfig> future = new KafkaFutureImpl<>();
future.completeExceptionally(new InvalidTopicException(
"The given topic name '" + newTopic.name() + "' cannot be represented in a request."));
topicFutures.put(newTopic.name(), future);
} else if (!topicFutures.containsKey(newTopic.name())) {
topicFutures.put(newTopic.name(), new KafkaFutureImpl<TopicMetadataAndConfig>());
topicFutures.put(newTopic.name().toUpperCase(), future);
} else if (!topicFutures.containsKey(newTopic.name().toUpperCase())) {
topicFutures.put(newTopic.name().toUpperCase(), new KafkaFutureImpl<TopicMetadataAndConfig>());
TopicDetails topicDetails = null;

if (newTopic.replicasAssignments() != null) {
@@ -1622,7 +1623,7 @@ public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics, fin
topicDetails = new TopicDetails(newTopic.numPartitions(), newTopic.replicationFactor());
}
}
topicsMap.put(newTopic.name(), topicDetails);
topicsMap.put(newTopic.name().toUpperCase(), topicDetails);
}
}
final long now = time.milliseconds();
@@ -1693,17 +1694,21 @@ public DeleteTopicsResult deleteTopics(Collection<String> topicNames, DeleteTopi

public org.oracle.okafka.clients.admin.DeleteTopicsResult deleteTopics(Collection<String> topicNames,
org.oracle.okafka.clients.admin.DeleteTopicsOptions options) {

final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicNames.size());
final List<String> validTopicNames = new ArrayList<>(topicNames.size());

for (String topicName : topicNames) {
if (topicNameIsUnrepresentable(topicName)) {

KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
future.completeExceptionally(new InvalidTopicException(
"The given topic name '" + topicName + "' cannot be represented in a request."));
topicFutures.put(topicName, future);
} else if (!topicFutures.containsKey(topicName)) {
topicFutures.put(topicName, new KafkaFutureImpl<Void>());
validTopicNames.add(topicName);
topicFutures.put(topicName.toUpperCase(), future);

} else if (!topicFutures.containsKey(topicName.toUpperCase())) {
topicFutures.put(topicName.toUpperCase(), new KafkaFutureImpl<Void>());
validTopicNames.add(topicName.toUpperCase());
}
}
final long now = time.milliseconds();
Original file line number Diff line number Diff line change
@@ -739,19 +739,25 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener liste
try {
if (topics == null) {
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");

} else if (topics.isEmpty()) {
// treat subscribing to empty topic list as the same as unsubscribing
this.unsubscribe();
} else {
if (topics.size() > 1)
throw new IllegalArgumentException("Only one topic can be subscribed");


Collection<String> topicsUp = new ArrayList<String>(topics.size());

for (String topic : topics) {
if (topic == null || topic.trim().isEmpty())
throw new IllegalArgumentException(
"Topic collection to subscribe to cannot contain null or empty topic");

topicsUp.add(topic.toUpperCase());
}

topics = topicsUp;

// Only one topic can be subscribed, unsubcribe to previous topics before
// subscribing to new topic
Set<String> Alltopics = subscriptions.metadataTopics();
@@ -765,7 +771,6 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener liste
// metadata.setTopics(subscriptions.groupSubscription());
// Change for 2.8.1 groupSubscription() is not present any more
metadata.setTopics(subscribedTopicSet);

}
} finally {
release();

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1778,7 +1778,8 @@ public void abortTransaction() throws ProducerFencedException {
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
ProducerRecord<K,V> recordUp = new ProducerRecord<K,V>(record.topic().toUpperCase(),record.partition(), record.timestamp(), record.key(), record.value(), record.headers());
return send(recordUp, null);
}

/**
Original file line number Diff line number Diff line change
@@ -359,7 +359,7 @@ public Future<RecordMetadata> transactionalSend(TopicPartition tp, byte[] serial
int msgVersion = topicTeqParam.getMsgVersion();

BytesMessage byteMessage = createBytesMessage(topicPbs.sess, tp,
ByteBuffer.wrap(serializedKey), ByteBuffer.wrap(serializedValue), headers, msgVersion);
serializedKey != null ? ByteBuffer.wrap(serializedKey) : null, serializedValue != null ? ByteBuffer.wrap(serializedValue) : null , headers, msgVersion);

try {
tps.publish(byteMessage, DeliveryMode.PERSISTENT, 0, AQjmsConstants.EXPIRATION_NEVER);
@@ -387,7 +387,7 @@ public Future<RecordMetadata> transactionalSend(TopicPartition tp, byte[] serial
Collections.singletonList(thisOffset), publishException);

frm = new FutureRecordMetadata(produceResult, 0, System.currentTimeMillis(),
serializedKey.length, serializedValue.length,time );
serializedKey != null ? serializedKey.length : 0, serializedValue != null ? serializedValue.length : 0 ,time );

produceResult.done();
this.oTxm.addRecordToTransaction(frm);
@@ -398,7 +398,7 @@ public Future<RecordMetadata> transactionalSend(TopicPartition tp, byte[] serial
produceResult = new ProduceRequestResult(tp);
produceResult.set(-1L, -1L, null, new RuntimeException(e));
frm = new FutureRecordMetadata(produceResult, -1l, System.currentTimeMillis(),
serializedKey.length, serializedValue.length,time );
serializedKey != null ? serializedKey.length : 0, serializedValue != null ? serializedValue.length : 0 ,time );
produceResult.done();
}
return frm;
8 changes: 6 additions & 2 deletions connectors/README.md
Original file line number Diff line number Diff line change
@@ -68,11 +68,14 @@ exec sys.dbms_aqadm.set_queue_parameter('TxEventQ', 'KEY_BASED_ENQUEUE', 2);
exec sys.dbms_aqadm.start_queue('TxEventQ');
exec sys.dbms_aqadm.add_subscriber('TxEventQ', SYS.AQ$_AGENT('SUB1', NULL, 0));
```
### Oracle Database Automatic Memory Management
It is recommended that the database be configured to allow automatic memory management. Refer to [About Automatic Memory Management](https://docs.oracle.com/en/database/oracle/oracle-database/23/admin/managing-memory.html#GUID-0F348EAB-9970-4207-8EF3-0F58B64E959A)
for information on how to allow the Oracle Database instance to automatically manage instance memory.

### Setup Oracle RAC Cluster for Cross Instance Enqueues
If running an Oracle RAC cluster read the instructions here for [User Event Streaming](https://docs.oracle.com/en/database/oracle/oracle-database/23/adque/aq-performance-scalability.html#GUID-423633E9-9B72-45B5-9C3E-95386BBEDBA0)
to properly configure the **REMOTE_LISTENER** parameter. The **REMOTE_LISTENER** configuration is necessary to produce messages to the event stream mapped to the respective Kafka partition. If the
**REMOTE_LISTENER** parameter is not configured, the sink connector will fail with `ORA-25348`. **Note:** Also set the isRac property to true in the `connect-txeventq-sink.properties` file.
**REMOTE_LISTENER** parameter is not configured, the sink connector will fail with `ORA-25348`.

### Steps to Create an Oracle Wallet

@@ -236,7 +239,8 @@ bootstrap.servers=<broker i.e localhost:9092>
### Running TxEventQ Kafka connect sink or source connectors

Update Kafka's `connect-standalone.properties` or `connect-distributed.properties` configuration file located in Kafka's config directory `plugin.path=` property with the
directory path to where the jar file for the Sink Connector is located.
directory path to where the jar file for the connectors is located. Add the `consumer.max.poll.records` property to either the `connect-standalone.properties` or `connect-distributed.properties`
to increase the number of records that will be sent by the sink connector for each poll. The default value for the `consumer.max.poll.records` is 500.

In the Kafaka's config directory locate and open the zookeeper.properties file and update th dataDir property with the directory path where you installed Kafka.
The property should have a value such as dataDir=c:/kafka/zookeeper-data if the path to Kafka is c:\kafka. The same file will need to be updated in a Linux environment,