Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create source_table and changefeed in producer code, update spring bo… #85

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.crdb.cdc.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

import javax.management.timer.Timer;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.stream.Collectors;

@Component
public class ConsumerTableInitializer implements CommandLineRunner {

@Autowired
private ResourceLoader resourceLoader;
private final Logger log = LoggerFactory.getLogger(getClass());

@Autowired
private JdbcTemplate jdbcTemplate;

@Override
public void run(String... args) throws Exception {
Resource resource = resourceLoader.getResource("classpath:schema.sql");
try (BufferedReader reader = new BufferedReader(new InputStreamReader(resource.getInputStream()))) {
String sqlScript = reader.lines().collect(Collectors.joining("\n"));
jdbcTemplate.execute(sqlScript);
log.debug("executing sql statement ");
} catch (IOException e) {
throw new RuntimeException("Failed to read SQL script", e);
}
}
}
6 changes: 3 additions & 3 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ services:
container_name: roach-source
hostname: roach-source
image: cockroachdb/cockroach:latest
command: start-single-node --cluster-name=roach-source --logtostderr=WARNING --insecure
command: start-single-node --insecure
ports:
- "8080:8080"
- "26257:26257"
Expand All @@ -15,7 +15,7 @@ services:
container_name: roach-destination
hostname: roach-destination
image: cockroachdb/cockroach:latest
command: start-single-node --cluster-name=roach-destination --logtostderr=WARNING --insecure
command: start-single-node --insecure
ports:
- "8081:8080"
- "26258:26257"
Expand Down Expand Up @@ -43,4 +43,4 @@ services:
KAFKA_ADVERTISED_LISTENERS: LISTENER_A://kafka:29092,LISTENER_B://kafka:9092,LISTENER_C://localhost:39092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_A:PLAINTEXT,LISTENER_B:PLAINTEXT,LISTENER_C:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_A
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
5 changes: 2 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>19</java.version>
<spring.boot.version>3.1.0</spring.boot.version>
<spring.boot.version>3.3.5</spring.boot.version>
<maven.compiler.source>19</maven.compiler.source>
<maven.compiler.target>19</maven.compiler.target>
<skipTests>true</skipTests>
Expand All @@ -39,8 +39,7 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<release>${maven.compiler.target}</release>
</configuration>
</plugin>
<plugin>
Expand Down
27 changes: 27 additions & 0 deletions producer/src/main/java/io/crdb/cdc/producer/Producer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;


@Component
public class Producer implements ApplicationRunner {

Expand All @@ -25,6 +27,10 @@ public class Producer implements ApplicationRunner {
private static final String INSERT = "INSERT INTO source_table (id, balance, created_timestamp) VALUES (?, ?, ?)";
private static final String UPDATE = "UPDATE source_table SET balance = ? WHERE id = ?";
private static final String DELETE = "DELETE FROM source_table WHERE id = ?";
private static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS source_table (id UUID PRIMARY KEY, balance INT, created_timestamp TIMESTAMP);";

private static final String CREATE_CHANGE = "CREATE CHANGEFEED FOR TABLE source_table INTO 'kafka://kafka:9092';";
private static final String SHOW_JOBS = "SHOW CHANGEFEED JOBS;";

private final DataSource dataSource;

Expand All @@ -39,6 +45,26 @@ public void run(ApplicationArguments args) throws Exception {
log.info("sleeping for 1 minute before executing sql statements on the 'source` database...");

Thread.sleep(Timer.ONE_MINUTE);
final Connection conn = dataSource.getConnection();
log.info("creating source table...");
final PreparedStatement tableStatement = conn.prepareStatement(CREATE_TABLE);
tableStatement.executeUpdate();

log.info("checking for changefeed already created...");
final PreparedStatement showJobsStatement = conn.prepareStatement(SHOW_JOBS);
ResultSet rs = showJobsStatement.executeQuery();
int rowCount = 0;
while (rs.next()) {
rowCount++;
}
rs.close();
showJobsStatement.close();
log.info("number of rows from " + SHOW_JOBS + " " + rowCount);
if (rowCount == 0) {
log.info("creating changefeed...");
final PreparedStatement changefeedStatement = conn.prepareStatement(CREATE_CHANGE);
changefeedStatement.executeQuery();
}

int globalOperationCounter = 1;

Expand Down Expand Up @@ -99,4 +125,5 @@ public void run(ApplicationArguments args) throws Exception {
log.info("sleeping for 1 minute before shutting down Producer service...");
Thread.sleep(Timer.ONE_MINUTE);
}

}
Loading