Skip to content

Commit

Permalink
Fix Catalog resource leak
Browse files Browse the repository at this point in the history
  • Loading branch information
fqaiser94 committed Apr 9, 2024
1 parent b693d59 commit faa2a2b
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.tabular.iceberg.connect.api.Committer;
import io.tabular.iceberg.connect.api.Writer;
import io.tabular.iceberg.connect.channel.CommitterImpl;
import io.tabular.iceberg.connect.data.Utilities;
import io.tabular.iceberg.connect.data.WriterImpl;
import java.util.Collection;
import java.util.Map;
Expand Down Expand Up @@ -50,26 +51,11 @@ public void start(Map<String, String> props) {
this.config = new IcebergSinkConfig(props);
}

private <C> void close(C closeable) {
if (closeable != null) {
if (closeable instanceof AutoCloseable) {
try {
((AutoCloseable) closeable).close();
} catch (Exception e) {
LOG.warn(
"An error occurred while trying to close {} instance, ignoring...",
closeable.getClass().getSimpleName(),
e);
}
}
}
}

private void clearState() {
close(writer);
Utilities.close(writer);
writer = null;

close(committer);
Utilities.close(committer);
committer = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.tabular.iceberg.connect.events.EventType;
import io.tabular.iceberg.connect.events.TableName;
import io.tabular.iceberg.connect.events.TopicPartitionOffset;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand All @@ -58,7 +57,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitterImpl implements Committer, Closeable {
public class CommitterImpl implements Committer, AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(CommitterImpl.class);
private final SinkTaskContext context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.data.Utilities;
import io.tabular.iceberg.connect.events.CommitCompletePayload;
import io.tabular.iceberg.connect.events.CommitRequestPayload;
import io.tabular.iceberg.connect.events.CommitTablePayload;
import io.tabular.iceberg.connect.events.Event;
import io.tabular.iceberg.connect.events.EventType;
import io.tabular.iceberg.connect.events.TableName;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
Expand All @@ -50,6 +50,7 @@
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -67,7 +68,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class Coordinator implements Closeable {
class Coordinator implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
Expand All @@ -87,6 +88,15 @@ class Coordinator implements Closeable {
private final ExecutorService exec;
private final CommitState commitState;

Coordinator(
IcebergSinkConfig config,
Collection<MemberDescription> members,
ConsumerFactory consumerFactory,
ProducerFactory producerFactory) {
this(config, members, Utilities.loadCatalog(config), consumerFactory, producerFactory);
}

@VisibleForTesting
Coordinator(
IcebergSinkConfig config,
Collection<MemberDescription> members,
Expand Down Expand Up @@ -364,5 +374,6 @@ public void close() throws IOException {
exec.shutdownNow();
producer.close();
consumer.close();
Utilities.close(catalog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package io.tabular.iceberg.connect.channel;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.data.Utilities;
import java.util.Collection;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
Expand Down Expand Up @@ -51,8 +50,7 @@ public CoordinatorThread create(IcebergSinkConfig config) {
if (groupDesc.state() == ConsumerGroupState.STABLE) {
Collection<MemberDescription> members = groupDesc.members();
Coordinator coordinator =
new Coordinator(
config, members, Utilities.loadCatalog(config), consumerFactory, producerFactory);
new Coordinator(config, members, consumerFactory, producerFactory);
thread = new CoordinatorThread(coordinator);
thread.start();
LOG.info("Started commit coordinator");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergWriterFactory {
public class IcebergWriterFactory implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(IcebergWriterFactory.class);

private final Catalog catalog;
private final IcebergSinkConfig config;

public IcebergWriterFactory(Catalog catalog, IcebergSinkConfig config) {
public IcebergWriterFactory(IcebergSinkConfig config) {
this(config, Utilities.loadCatalog(config));
}

@VisibleForTesting
IcebergWriterFactory(IcebergSinkConfig config, Catalog catalog) {
this.catalog = catalog;
this.config = config;
}
Expand Down Expand Up @@ -109,4 +114,9 @@ Table autoCreateTable(String tableName, SinkRecord sample) {
});
return result.get();
}

@Override
public void close() throws Exception {
Utilities.close(catalog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -264,5 +264,20 @@ private static Class<?> dynamicallyLoad(String className) {
return configClass;
}

public static <C> void close(C closeable) {
if (closeable != null) {
if (closeable instanceof AutoCloseable) {
try {
((AutoCloseable) closeable).close();
} catch (Exception e) {
LOG.warn(
"An error occurred while trying to close {} instance, ignoring...",
closeable.getClass().getSimpleName(),
e);
}
}
}
}

private Utilities() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.api.Committable;
import io.tabular.iceberg.connect.api.Writer;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
Expand All @@ -35,7 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WriterImpl implements Writer, Closeable {
public class WriterImpl implements Writer, AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class);
private final IcebergSinkConfig config;
Expand All @@ -44,7 +43,7 @@ public class WriterImpl implements Writer, Closeable {
private final Map<TopicPartition, Offset> offsetsByTopicPartition;

public WriterImpl(SinkTaskContext context, IcebergSinkConfig config) {
this(context, config, new IcebergWriterFactory(Utilities.loadCatalog(config), config));
this(context, config, new IcebergWriterFactory(config));
}

@VisibleForTesting
Expand Down Expand Up @@ -172,5 +171,6 @@ private void clearOffsets() {
public void close() throws IOException {
clearWriters();
clearOffsets();
Utilities.close(writerFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void testAutoCreateTable(boolean partitioned) {
SinkRecord record = mock(SinkRecord.class);
when(record.value()).thenReturn(ImmutableMap.of("id", 123, "data", "foo2"));

IcebergWriterFactory factory = new IcebergWriterFactory(catalog, config);
IcebergWriterFactory factory = new IcebergWriterFactory(config, catalog);
factory.autoCreateTable("db.tbl", record);

ArgumentCaptor<TableIdentifier> identCaptor = ArgumentCaptor.forClass(TableIdentifier.class);
Expand Down

0 comments on commit faa2a2b

Please sign in to comment.