Skip to content

Commit

Permalink
Discard op code m change events (#42431)
Browse files Browse the repository at this point in the history
Co-authored-by: Evan Tahler <[email protected]>
  • Loading branch information
rodireich and evantahler authored Jul 23, 2024
1 parent 4cadb34 commit 2a6d6e8
Show file tree
Hide file tree
Showing 7 changed files with 540 additions and 476 deletions.
553 changes: 277 additions & 276 deletions airbyte-cdk/java/airbyte-cdk/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.43.1
version=0.43.2
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ class DebeziumRecordIterator<T>(
}

val changeEventWithMetadata = ChangeEventWithMetadata(next)

// #41647: discard event type with op code 'm'
if (!isEventTypeHandled(changeEventWithMetadata)) {
LOGGER.info { "WAL event type not handled: $next" }
continue
}

hasSnapshotFinished = !changeEventWithMetadata.isSnapshotEvent

if (isEventLogged) {
Expand Down Expand Up @@ -325,5 +332,18 @@ class DebeziumRecordIterator<T>(
companion object {
val pollLogMaxTimeInterval: Duration = Duration.ofSeconds(5)
const val POLL_LOG_MAX_CALLS_INTERVAL = 1_000

/**
* We are not interested in message events. According to debezium
* [documentation](https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-create-events)
* , possible operation code are: c: create, u: update, d: delete, r: read (applies to only
* snapshots) t: truncate, m: message
*/
fun isEventTypeHandled(event: ChangeEventWithMetadata): Boolean {
event.eventValueAsJson()["op"]?.asText()?.let {
return it in listOf("c", "u", "d", "r", "t")
}
?: return false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import java.util.*
import org.apache.kafka.connect.source.SourceRecord
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
import org.mockito.Mockito.mock

class DebeziumRecordIteratorTest {
Expand Down Expand Up @@ -61,10 +63,6 @@ class DebeziumRecordIteratorTest {
override fun destination(): String? {
return null
}

fun sourceRecord(): SourceRecord {
return sourceRecord
}
},
)

Expand All @@ -76,4 +74,48 @@ class DebeziumRecordIteratorTest {
val testConfig = "{\"is_test\": true}"
return mapper.readTree(testConfig)
}

@ParameterizedTest
@CsvSource(
"c, true",
"u, true",
"d, true",
"r, true",
"t, true",
"m, false",
"badVal, false",
"'', false",
)
fun handledEventTypesTest(op: String, handled: Boolean) {
Assertions.assertEquals(
handled,
DebeziumRecordIterator.isEventTypeHandled(
ChangeEventWithMetadata(
object : ChangeEvent<String?, String?> {

private val sourceRecord =
SourceRecord(
null,
Collections.singletonMap("lsn", 358824993496L),
null,
null,
null,
)

override fun key(): String? {
return ""
}

override fun value(): String {
return "{\"op\":\"$op\", \"source\": {\"snapshot\": \"false\"}}"
}

override fun destination(): String? {
return null
}
}
)
)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.43.1'
cdkVersionRequired = '0.43.2'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.6.7
dockerImageTag: 3.6.8
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Loading

0 comments on commit 2a6d6e8

Please sign in to comment.