diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkTask.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkTask.java index 07627d3b..7b57f125 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkTask.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkTask.java @@ -21,6 +21,7 @@ import io.tabular.iceberg.connect.api.Committer; import io.tabular.iceberg.connect.api.Writer; import io.tabular.iceberg.connect.channel.CommitterImpl; +import io.tabular.iceberg.connect.data.Utilities; import io.tabular.iceberg.connect.data.WriterImpl; import java.util.Collection; import java.util.Map; @@ -50,26 +51,11 @@ public void start(Map props) { this.config = new IcebergSinkConfig(props); } - private void close(C closeable) { - if (closeable != null) { - if (closeable instanceof AutoCloseable) { - try { - ((AutoCloseable) closeable).close(); - } catch (Exception e) { - LOG.warn( - "An error occurred while trying to close {} instance, ignoring...", - closeable.getClass().getSimpleName(), - e); - } - } - } - } - private void clearState() { - close(writer); + Utilities.close(writer); writer = null; - close(committer); + Utilities.close(committer); committer = null; } diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java index a6263711..834037e0 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java @@ -32,7 +32,6 @@ import io.tabular.iceberg.connect.events.EventType; import io.tabular.iceberg.connect.events.TableName; import io.tabular.iceberg.connect.events.TopicPartitionOffset; -import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Map; @@ -58,7 +57,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class CommitterImpl implements Committer, Closeable { +public class CommitterImpl implements Committer, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(CommitterImpl.class); private final SinkTaskContext context; diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java index 4804cfa5..f0ebb45f 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java @@ -23,13 +23,13 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import io.tabular.iceberg.connect.IcebergSinkConfig; +import io.tabular.iceberg.connect.data.Utilities; import io.tabular.iceberg.connect.events.CommitCompletePayload; import io.tabular.iceberg.connect.events.CommitRequestPayload; import io.tabular.iceberg.connect.events.CommitTablePayload; import io.tabular.iceberg.connect.events.Event; import io.tabular.iceberg.connect.events.EventType; import io.tabular.iceberg.connect.events.TableName; -import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.time.Duration; @@ -50,6 +50,7 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -67,7 +68,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class Coordinator implements Closeable { +class Coordinator implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class); private static final ObjectMapper MAPPER = new ObjectMapper(); @@ -87,6 +88,15 @@ class Coordinator implements Closeable { private final ExecutorService exec; private final CommitState commitState; + Coordinator( + IcebergSinkConfig config, + Collection members, + ConsumerFactory consumerFactory, + ProducerFactory producerFactory) { + this(config, members, Utilities.loadCatalog(config), consumerFactory, producerFactory); + } + + @VisibleForTesting Coordinator( IcebergSinkConfig config, Collection members, @@ -364,5 +374,6 @@ public void close() throws IOException { exec.shutdownNow(); producer.close(); consumer.close(); + Utilities.close(catalog); } } diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CoordinatorThreadFactoryImpl.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CoordinatorThreadFactoryImpl.java index 4ac63fd4..f4a7352b 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CoordinatorThreadFactoryImpl.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CoordinatorThreadFactoryImpl.java @@ -19,7 +19,6 @@ package io.tabular.iceberg.connect.channel; import io.tabular.iceberg.connect.IcebergSinkConfig; -import io.tabular.iceberg.connect.data.Utilities; import java.util.Collection; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.ConsumerGroupDescription; @@ -51,8 +50,7 @@ public CoordinatorThread create(IcebergSinkConfig config) { if (groupDesc.state() == ConsumerGroupState.STABLE) { Collection members = groupDesc.members(); Coordinator coordinator = - new Coordinator( - config, members, Utilities.loadCatalog(config), consumerFactory, producerFactory); + new Coordinator(config, members, consumerFactory, producerFactory); thread = new CoordinatorThread(coordinator); thread.start(); LOG.info("Started commit coordinator"); diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriterFactory.java index a11d1cf1..a688b21b 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriterFactory.java @@ -34,14 +34,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class IcebergWriterFactory { +public class IcebergWriterFactory implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(IcebergWriterFactory.class); private final Catalog catalog; private final IcebergSinkConfig config; - public IcebergWriterFactory(Catalog catalog, IcebergSinkConfig config) { + public IcebergWriterFactory(IcebergSinkConfig config) { + this(config, Utilities.loadCatalog(config)); + } + + @VisibleForTesting + IcebergWriterFactory(IcebergSinkConfig config, Catalog catalog) { this.catalog = catalog; this.config = config; } @@ -109,4 +114,9 @@ Table autoCreateTable(String tableName, SinkRecord sample) { }); return result.get(); } + + @Override + public void close() throws Exception { + Utilities.close(catalog); + } } diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java index 84fe76d7..01aebae8 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Utilities.java @@ -264,5 +264,20 @@ private static Class dynamicallyLoad(String className) { return configClass; } + public static void close(C closeable) { + if (closeable != null) { + if (closeable instanceof AutoCloseable) { + try { + ((AutoCloseable) closeable).close(); + } catch (Exception e) { + LOG.warn( + "An error occurred while trying to close {} instance, ignoring...", + closeable.getClass().getSimpleName(), + e); + } + } + } + } + private Utilities() {} } diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/WriterImpl.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/WriterImpl.java index abfe742a..9376cd3b 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/WriterImpl.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/WriterImpl.java @@ -21,7 +21,6 @@ import io.tabular.iceberg.connect.IcebergSinkConfig; import io.tabular.iceberg.connect.api.Committable; import io.tabular.iceberg.connect.api.Writer; -import java.io.Closeable; import java.io.IOException; import java.util.Collection; import java.util.Map; @@ -35,7 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class WriterImpl implements Writer, Closeable { +public class WriterImpl implements Writer, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class); private final IcebergSinkConfig config; @@ -44,7 +43,7 @@ public class WriterImpl implements Writer, Closeable { private final Map offsetsByTopicPartition; public WriterImpl(SinkTaskContext context, IcebergSinkConfig config) { - this(context, config, new IcebergWriterFactory(Utilities.loadCatalog(config), config)); + this(context, config, new IcebergWriterFactory(config)); } @VisibleForTesting @@ -172,5 +171,6 @@ private void clearOffsets() { public void close() throws IOException { clearWriters(); clearOffsets(); + Utilities.close(writerFactory); } } diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/IcebergWriterFactoryTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/IcebergWriterFactoryTest.java index b15e7394..9ebcfe10 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/IcebergWriterFactoryTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/data/IcebergWriterFactoryTest.java @@ -62,7 +62,7 @@ public void testAutoCreateTable(boolean partitioned) { SinkRecord record = mock(SinkRecord.class); when(record.value()).thenReturn(ImmutableMap.of("id", 123, "data", "foo2")); - IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config); + IcebergWriterFactory factory = new IcebergWriterFactory(config, catalog); factory.autoCreateTable("db.tbl", record); ArgumentCaptor identCaptor = ArgumentCaptor.forClass(TableIdentifier.class);