Skip to content

Handling the restart with apache beam debezium application #28248

@sjay1728

Description

@sjay1728

In my current scenario, I have an Apache Beam application that employs Debezium for capturing changes in data. Unlike the typical integration with Kafka, I am not using it in conjunction with Kafka. However, I've encountered a challenge related to restarting the application. Whenever I restart the application, it retrieves all the data from the very beginning. My objective is to ensure that the application resumes consuming data from where it left off before the restart. I would greatly appreciate any guidance or insights on how to achieve this. Here is the structure of my application:

public class PipelineTest {
private static final DebeziumIO.ConnectorConfiguration mySqlConnectorConfig = DebeziumIO.ConnectorConfiguration
.create()
.withUsername("xxxxxxxx")
.withPassword("xxxxxxxx")
.withHostName("localhost")
.withPort("5432")
.withConnectorClass(PostgresConnector.class)
.withConnectionProperty("database.dbname", "xxxxxxxx")
.withConnectionProperty("database.server.id", "xxxxxxxx")
.withConnectionProperty("database.server.name", "xxxxxxxx")
.withConnectionProperty("plugin.name", "pgoutput")
.withConnectionProperty("slot.name", "debezium5")
.withConnectionProperty("table.include.list", "public.table1");

public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();
    options.setRunner(org.apache.beam.runners.direct.DirectRunner.class); // Set the Direct Runner
    Pipeline pipeline = Pipeline.create(options);

    System.out.println("Pipeline starting...");

    PCollection<String> records = pipeline
            .apply(DebeziumIO.<String>read()
                    .withConnectorConfiguration(mySqlConnectorConfig)
                    .withFormatFunction(sourceRecord ->
                         sourceRecord.value().toString())
                    .withCoder(StringUtf8Coder.of())
            );

    records.apply(ParDo.of(new DoFn<String, String>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            System.out.println("Pipeline starting 3..." + c.element());
        }
    }));

    System.out.println("Pipeline ending...");

    pipeline.run().waitUntilFinish();
}
}

it seems like replication slot related following values are not changing with the at all, the query =>[ SELECT * FROM pg_replication_slots; ] catalog_xmin restart_lsn confirmed_flush_lsn

I would greatly appreciate any guidance or insights on how to achieve this..

I would greatly appreciate any guidance or insights on how to achieve this

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions