Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
fqaiser94 committed Apr 9, 2024
1 parent eb66c82 commit b693d59
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ public void open(Collection<TopicPartition> partitions) {

@Override
public void put(Collection<SinkRecord> sinkRecords) {
writer.put(sinkRecords);
committer.commit(writer);
if (writer != null && committer != null) {
writer.put(sinkRecords);
committer.commit(writer);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ private Map<TopicPartition, Long> committedOffsets(AdminFactory adminFactory, St
}

private void throwExceptionIfCoordinatorIsTerminated() {
if (isLeader && coordinatorThread.isTerminated()) {
if (isLeader && coordinatorThread != null && coordinatorThread.isTerminated()) {
throw new IllegalStateException("Coordinator unexpectedly terminated");
}
}
Expand Down Expand Up @@ -245,7 +245,7 @@ private <T> void close(T closeable) {
@Override
public void close() throws IOException {
close(commitRequestListener);
if (isLeader) {
if (isLeader && coordinatorThread != null) {
coordinatorThread.terminate();
}
close(producer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,24 @@ class CoordinatorThreadFactoryImpl implements CoordinatorThreadFactory {

@Override
public CoordinatorThread create(IcebergSinkConfig config) {
final Collection<MemberDescription> members;
final CoordinatorThread thread;

try (Admin admin = adminFactory.create(config.kafkaProps())) {
ConsumerGroupDescription groupDesc =
KafkaUtils.consumerGroupDescription(config.connectGroupId(), admin);
if (groupDesc.state() == ConsumerGroupState.STABLE) {
members = groupDesc.members();
Collection<MemberDescription> members = groupDesc.members();
Coordinator coordinator =
new Coordinator(
config, members, Utilities.loadCatalog(config), consumerFactory, producerFactory);
thread = new CoordinatorThread(coordinator);
thread.start();
LOG.info("Started commit coordinator");
} else {
throw new IllegalStateException("Consumer group is not stable, cannot start coordinator");
thread = null;
}
}

Coordinator coordinator =
new Coordinator(
config, members, Utilities.loadCatalog(config), consumerFactory, producerFactory);
CoordinatorThread thread = new CoordinatorThread(coordinator);
thread.start();
LOG.info("Started commit coordinator");
return thread;
}
}

0 comments on commit b693d59

Please sign in to comment.