Skip to content

Commit

Permalink
Merge pull request #38 from linny0608/updateDependencyVer
Browse files Browse the repository at this point in the history
Enhance Performance, Update Version, and Dependencies
  • Loading branch information
ichokshi2109 authored Feb 1, 2024
2 parents c097b30 + eeb7ade commit 337a351
Show file tree
Hide file tree
Showing 15 changed files with 929 additions and 131 deletions.
861 changes: 819 additions & 42 deletions connectors/THIRD_PARTY_LICENSE.txt

Large diffs are not rendered by default.

16 changes: 2 additions & 14 deletions connectors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@
<maven.compiler.target>${java.version}</maven.compiler.target>
<maven.compiler.source>${java.version}</maven.compiler.source>
<java.version>11</java.version>
<logback.version>1.3.0-alpha13</logback.version>
<logback.version>1.4.14</logback.version>
<oracle-db-messaging.version>23.2.0.0</oracle-db-messaging.version>
<oracle-jdbc.version>23.2.0.0</oracle-jdbc.version>
<oracle.database.security.version>21.9.0.0</oracle.database.security.version>
<jms.version>2.0.1</jms.version>
<kafka.version>3.2.0</kafka.version>
<kafka.version>3.6.1</kafka.version>
</properties>

<dependencies>
Expand All @@ -68,13 +68,6 @@
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/connect-runtime -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<version>3.2.0</version>
</dependency>

<!-- Oracle Database Dependencies -->
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
Expand Down Expand Up @@ -111,11 +104,6 @@
<artifactId>jta</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
** Kafka Connect for TxEventQ version 1.0.
** Kafka Connect for TxEventQ.
**
** Copyright (c) 2019, 2022 Oracle and/or its affiliates.
** Copyright (c) 2023, 2024 Oracle and/or its affiliates.
** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
*/

Expand All @@ -24,14 +24,10 @@

package oracle.jdbc.txeventq.kafka.connect.common.utils;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Sanitizer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.util.Properties;

import javax.management.InstanceAlreadyExistsException;
import javax.management.InstanceNotFoundException;
Expand All @@ -41,10 +37,13 @@
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;

import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.util.Properties;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Sanitizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AppInfoParser {
private static final Logger log = LoggerFactory.getLogger(AppInfoParser.class);
Expand All @@ -55,13 +54,13 @@ public class AppInfoParser {

static {
Properties props = new Properties();
try (InputStream resourceStream = AppInfoParser.class.getResourceAsStream("/kafka-connect-oracle-version.properties")) {
try (InputStream resourceStream = AppInfoParser.class
.getResourceAsStream("/kafka-connect-oracle-version.properties")) {
props.load(resourceStream);
} catch (IOException e) {
log.warn("Error while loading kafka-connect-oracle-version.properties.");
}


NAME = props.getProperty("name", "unknown").trim();
VERSION = props.getProperty("version", "unknown").trim();
COMMIT_ID = props.getProperty("commitId", "unknown").trim();
Expand All @@ -71,16 +70,19 @@ public class AppInfoParser {
public static String getName() {
return NAME;
}

public static String getVersion() {
return VERSION;
}

public static String getCommitId() {
return COMMIT_ID;
}

public static synchronized void registerAppInfo(String prefix, String id, Metrics metrics) {
try {
ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + Sanitizer.jmxSanitize(id));
ObjectName name = new ObjectName(
prefix + ":type=app-info,id=" + Sanitizer.jmxSanitize(id));
AppInfoParser.AppInfo mBean = new AppInfoParser.AppInfo();
ManagementFactory.getPlatformMBeanServer().registerMBean(mBean, name);
registerMetrics(metrics); // prefix will be added later by JmxReporter
Expand All @@ -98,7 +100,8 @@ public static synchronized void registerAppInfo(String prefix, String id, Metric
public static synchronized void unregisterAppInfo(String prefix, String id, Metrics metrics) {
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
try {
ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + Sanitizer.jmxSanitize(id));
ObjectName name = new ObjectName(
prefix + ":type=app-info,id=" + Sanitizer.jmxSanitize(id));
if (server.isRegistered(name))
server.unregisterMBean(name);

Expand All @@ -118,8 +121,10 @@ private static MetricName metricName(Metrics metrics, String name) {

private static void registerMetrics(Metrics metrics) {
if (metrics != null) {
metrics.addMetric(metricName(metrics, "version"), new AppInfoParser.ImmutableValue<>(VERSION));
metrics.addMetric(metricName(metrics, "commit-id"), new AppInfoParser.ImmutableValue<>(COMMIT_ID));
metrics.addMetric(metricName(metrics, "version"),
new AppInfoParser.ImmutableValue<>(VERSION));
metrics.addMetric(metricName(metrics, "commit-id"),
new AppInfoParser.ImmutableValue<>(COMMIT_ID));
}
}

Expand All @@ -132,14 +137,15 @@ private static void unregisterMetrics(Metrics metrics) {

public interface AppInfoMBean {
public String getVersion();

public String getCommitId();
}

public static class AppInfo implements AppInfoParser.AppInfoMBean {

public AppInfo() {
}

@Override
public String getVersion() {
return AppInfoParser.getVersion();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,27 @@
/*
** Kafka Connect for TxEventQ.
**
** Copyright (c) 2023, 2024 Oracle and/or its affiliates.
** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
*/

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package oracle.jdbc.txeventq.kafka.connect.common.utils;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
** Kafka Connect for TxEventQ version 1.0.
** Kafka Connect for TxEventQ.
**
** Copyright (c) 2019, 2022 Oracle and/or its affiliates.
** Copyright (c) 2023, 2024 Oracle and/or its affiliates.
** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
*/

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
** Kafka Connect for TxEventQ version 1.0.
** Kafka Connect for TxEventQ.
**
** Copyright (c) 2019, 2022 Oracle and/or its affiliates.
** Copyright (c) 2023, 2024 Oracle and/or its affiliates.
** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
*/

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
** Kafka Connect for TxEventQ version 1.0.
** Kafka Connect for TxEventQ.
**
** Copyright (c) 2019, 2022 Oracle and/or its affiliates.
** Copyright (c) 2023, 2024 Oracle and/or its affiliates.
** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
*/

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
** Kafka Connect for TxEventQ version 1.0.
** Kafka Connect for TxEventQ.
**
** Copyright (c) 2019, 2022 Oracle and/or its affiliates.
** Copyright (c) 2023, 2024 Oracle and/or its affiliates.
** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
*/

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
** Kafka Connect for TxEventQ version 1.0.
** Kafka Connect for TxEventQ.
**
** Copyright (c) 2019, 2022 Oracle and/or its affiliates.
** Copyright (c) 2023, 2024 Oracle and/or its affiliates.
** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
*/

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
** Kafka Connect for TxEventQ version 1.0.
** Kafka Connect for TxEventQ.
**
** Copyright (c) 2019, 2022 Oracle and/or its affiliates.
** Copyright (c) 2023, 2024 Oracle and/or its affiliates.
** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
*/

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
** Kafka Connect for TxEventQ version 1.0.
** Kafka Connect for TxEventQ.
**
** Copyright (c) 2019, 2022 Oracle and/or its affiliates.
** Copyright (c) 2023, 2024 Oracle and/or its affiliates.
** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
*/

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
** Kafka Connect for TxEventQ version 1.0.
** Kafka Connect for TxEventQ.
**
** Copyright (c) 2019, 2022 Oracle and/or its affiliates.
** Copyright (c) 2023, 2024 Oracle and/or its affiliates.
** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
*/

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
** Kafka Connect for TxEventQ version 1.0.
** Kafka Connect for TxEventQ.
**
** Copyright (c) 2019, 2022 Oracle and/or its affiliates.
** Copyright (c) 2023, 2024 Oracle and/or its affiliates.
** Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
*/

Expand All @@ -24,14 +24,15 @@

package oracle.jdbc.txeventq.kafka.connect.source.utils;

import java.util.Map;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.Map;

/**
* A message is the unit that is enqueued or dequeued. An TEQ Message object holds both its content, or payload, and its properties.
* This class provides methods to get and set message properties and the payload.
* A message is the unit that is enqueued or dequeued. An TEQ Message object holds both its content,
* or payload, and its properties. This class provides methods to get and set message properties and
* the payload.
*
* @param <T>
*/
Expand All @@ -42,52 +43,54 @@ public class TxEventQSourceRecord extends SourceRecord {
private byte[] messageId = new byte[0];

public enum PayloadType {
RAW, JSON, JMS
RAW, JSON, JMS
}

private final PayloadType payloadType;

public TxEventQSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic,
Integer partition, Schema valueSchema, Object value, PayloadType type, byte[] msgId) {
super(sourcePartition, sourceOffset, topic, partition, valueSchema, value);
this.payloadType = type;
this.messageId = msgId;
public TxEventQSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
String topic, Integer partition, Schema valueSchema, Object value, PayloadType type,
byte[] msgId) {
super(sourcePartition, sourceOffset, topic, partition, valueSchema, value);
this.payloadType = type;
this.messageId = msgId;
}

public TxEventQSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic,
Integer partition, Schema keySchema, java.lang.Object key, Schema valueSchema, Object value,
PayloadType type, byte[] msgId) {
super(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value);
this.payloadType = type;
this.messageId = msgId;
public TxEventQSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
String topic, Integer partition, Schema keySchema, java.lang.Object key,
Schema valueSchema, Object value, PayloadType type, byte[] msgId) {
super(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value);
this.payloadType = type;
this.messageId = msgId;
}

public TxEventQSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic,
Schema valueSchema, Object value, PayloadType type, byte[] msgId) {
super(sourcePartition, sourceOffset, topic, valueSchema, value);
this.payloadType = type;
this.messageId = msgId;
public TxEventQSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
String topic, Schema valueSchema, Object value, PayloadType type, byte[] msgId) {
super(sourcePartition, sourceOffset, topic, valueSchema, value);
this.payloadType = type;
this.messageId = msgId;
}

public TxEventQSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, String topic,
Schema keySchema, java.lang.Object key, Schema valueSchema, Object value, PayloadType type, byte[] msgId) {
super(sourcePartition, sourceOffset, topic, keySchema, key, valueSchema, value);
this.payloadType = type;
this.messageId = msgId;
public TxEventQSourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
String topic, Schema keySchema, java.lang.Object key, Schema valueSchema, Object value,
PayloadType type, byte[] msgId) {
super(sourcePartition, sourceOffset, topic, keySchema, key, valueSchema, value);
this.payloadType = type;
this.messageId = msgId;
}

public PayloadType getPayloadType() {
return payloadType;
return payloadType;
}

public String getMessageId() {
return byteArrayToHex(messageId);
return byteArrayToHex(messageId);
}

private static String byteArrayToHex(byte[] a) {
StringBuilder sb = new StringBuilder(a.length * 2);
for (byte b : a)
sb.append(String.format("%02x", b));
return sb.toString();
StringBuilder sb = new StringBuilder(a.length * 2);
for (byte b : a)
sb.append(String.format("%02x", b));
return sb.toString();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#
## Kafka Connect for TxEventQ version 1.0.
## Kafka Connect for TxEventQ.
##
## Copyright (c) 2019, 2022 Oracle and/or its affiliates.
## Copyright (c) 2023, 2024 Oracle and/or its affiliates.
## Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
#

Expand Down
Loading

0 comments on commit 337a351

Please sign in to comment.