Skip to content

Commit 4846a52

Browse files
add table filtering to mysql (airbytehq#69228)
Co-authored-by: Octavia Squidington III <[email protected]>
1 parent 827b326 commit 4846a52

File tree

7 files changed

+571
-270
lines changed

7 files changed

+571
-270
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
testExecutionConcurrency=1
22
JunitMethodExecutionTimeout=5m
3-
cdkVersion=0.1.76
3+
cdkVersion=0.1.77

airbyte-integrations/connectors/source-mysql/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
12-
dockerImageTag: 3.51.4
12+
dockerImageTag: 3.51.5
1313
dockerRepository: airbyte/source-mysql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
1515
githubIssueLabel: source-mysql

airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfiguration.kt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import io.airbyte.cdk.command.FeatureFlag
88
import io.airbyte.cdk.command.JdbcSourceConfiguration
99
import io.airbyte.cdk.command.SourceConfiguration
1010
import io.airbyte.cdk.command.SourceConfigurationFactory
11+
import io.airbyte.cdk.command.TableFilter
1112
import io.airbyte.cdk.jdbc.SSLCertificateUtils
1213
import io.airbyte.cdk.output.DataChannelMedium
1314
import io.airbyte.cdk.output.DataChannelMedium.SOCKET
@@ -41,6 +42,7 @@ data class MySqlSourceConfiguration(
4142
override val jdbcUrlFmt: String,
4243
override val jdbcProperties: Map<String, String>,
4344
override val namespaces: Set<String>,
45+
override val tableFilters: List<TableFilter>,
4446
val incrementalConfiguration: IncrementalConfiguration,
4547
override val maxConcurrency: Int,
4648
override val resourceAcquisitionHeartbeat: Duration = Duration.ofMillis(100L),
@@ -146,6 +148,22 @@ constructor(
146148
jdbcProperties["useCursorFetch"] = "true"
147149
jdbcProperties["sessionVariables"] = "autocommit=0"
148150

151+
// Only validate table filters if schemas are explicitly configured
152+
val tableFilters = pojo.tableFilters ?: emptyList()
153+
154+
// Convert MySQL TableFilter to JDBC TableFilter for validation
155+
val jdbcTableFilters: List<TableFilter> =
156+
tableFilters.map {
157+
TableFilter().apply {
158+
schemaName = it.databaseName
159+
patterns = it.patterns
160+
}
161+
}
162+
163+
pojo.database.let { schema ->
164+
JdbcSourceConfiguration.validateTableFilters(setOf(schema), jdbcTableFilters)
165+
}
166+
149167
// Internal configuration settings.
150168
val checkpointTargetInterval: Duration =
151169
Duration.ofSeconds(pojo.checkpointTargetIntervalSeconds?.toLong() ?: 0)
@@ -188,6 +206,7 @@ constructor(
188206
jdbcUrlFmt = jdbcUrlFmt,
189207
jdbcProperties = jdbcProperties,
190208
namespaces = setOf(pojo.database),
209+
tableFilters = jdbcTableFilters,
191210
incrementalConfiguration = incremental,
192211
checkpointTargetInterval = checkpointTargetInterval,
193212
maxConcurrency = maxConcurrency,

airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceConfigurationSpecification.kt

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ import jakarta.inject.Singleton
4141
@ConfigurationProperties(CONNECTOR_CONFIG_PREFIX)
4242
@SuppressFBWarnings(value = ["NP_NONNULL_RETURN_VIOLATION"], justification = "Micronaut DI")
4343
class MySqlSourceConfigurationSpecification : ConfigurationSpecification() {
44+
4445
@JsonProperty("host")
4546
@JsonSchemaTitle("Host")
46-
@JsonSchemaInject(json = """{"order":1}""")
4747
@JsonPropertyDescription("Hostname of the database.")
48+
@JsonSchemaInject(json = """{"order":1}""")
4849
lateinit var host: String
4950

5051
@JsonProperty("port")
@@ -74,14 +75,22 @@ class MySqlSourceConfigurationSpecification : ConfigurationSpecification() {
7475
@JsonSchemaInject(json = """{"order":6,"always_show":true}""")
7576
lateinit var database: String
7677

78+
@JsonProperty("table_filters")
79+
@JsonSchemaTitle("Table Filters")
80+
@JsonPropertyDescription(
81+
"Optional filters to include only specific tables from the specified database."
82+
)
83+
@JsonSchemaInject(json = """{"order":7}""")
84+
var tableFilters: List<TableFilter>? = null
85+
7786
@JsonProperty("jdbc_url_params")
7887
@JsonSchemaTitle("JDBC URL Params")
7988
@JsonPropertyDescription(
8089
"Additional properties to pass to the JDBC URL string when connecting to the database " +
8190
"formatted as 'key=value' pairs separated by the symbol '&'. " +
8291
"(example: key1=value1&key2=value2&key3=value3).",
8392
)
84-
@JsonSchemaInject(json = """{"order":7}""")
93+
@JsonSchemaInject(json = """{"order":8}""")
8594
var jdbcUrlParams: String? = null
8695

8796
@JsonIgnore
@@ -100,7 +109,7 @@ class MySqlSourceConfigurationSpecification : ConfigurationSpecification() {
100109
@JsonPropertyDescription(
101110
"The encryption method which is used when communicating with the database.",
102111
)
103-
@JsonSchemaInject(json = """{"order":8,"default":"required"}""")
112+
@JsonSchemaInject(json = """{"order":9,"default":"required"}""")
104113
fun getEncryptionValue(): EncryptionSpecification? = encryptionJson ?: encryption.asEncryption()
105114

106115
@JsonIgnore
@@ -120,7 +129,7 @@ class MySqlSourceConfigurationSpecification : ConfigurationSpecification() {
120129
"Whether to initiate an SSH tunnel before connecting to the database, " +
121130
"and if so, which kind of authentication to use.",
122131
)
123-
@JsonSchemaInject(json = """{"order":9}""")
132+
@JsonSchemaInject(json = """{"order":10}""")
124133
fun getTunnelMethodValue(): SshTunnelMethodConfiguration? =
125134
tunnelMethodJson ?: tunnelMethod.asSshTunnelMethod()
126135

@@ -138,28 +147,28 @@ class MySqlSourceConfigurationSpecification : ConfigurationSpecification() {
138147
@JsonGetter("replication_method")
139148
@JsonSchemaTitle("Update Method")
140149
@JsonPropertyDescription("Configures how data is extracted from the database.")
141-
@JsonSchemaInject(json = """{"order":10,"display_type":"radio"}""")
150+
@JsonSchemaInject(json = """{"order":11,"display_type":"radio"}""")
142151
fun getIncrementalValue(): IncrementalConfigurationSpecification =
143152
replicationMethodJson ?: replicationMethod.asCursorMethodConfiguration()
144153

145154
@JsonProperty("checkpoint_target_interval_seconds")
146155
@JsonSchemaTitle("Checkpoint Target Time Interval")
147-
@JsonSchemaInject(json = """{"order":11}""")
156+
@JsonSchemaInject(json = """{"order":12}""")
148157
@JsonSchemaDefault("300")
149158
@JsonPropertyDescription("How often (in seconds) a stream should checkpoint, when possible.")
150159
var checkpointTargetIntervalSeconds: Int? = 300
151160

152161
@JsonProperty("concurrency")
153162
@JsonSchemaTitle("Concurrency")
154-
// Hidden and maintened for backwards compatibility.
155-
@JsonSchemaInject(json = """{"order":12,"airbyte_hidden":true}""")
163+
// Hidden and maintained for backwards compatibility.
164+
@JsonSchemaInject(json = """{"order":13,"airbyte_hidden":true}""")
156165
@JsonSchemaDefault("1")
157166
@JsonPropertyDescription("Maximum number of concurrent queries to the database.")
158167
var concurrency: Int? = 1
159168

160169
@JsonProperty("max_db_connections")
161170
@JsonSchemaTitle("Max Concurrent Queries to Database")
162-
@JsonSchemaInject(json = """{"order":12}""")
171+
@JsonSchemaInject(json = """{"order":13}""")
163172
@JsonPropertyDescription(
164173
"Maximum number of concurrent queries to the database. Leave empty to let Airbyte optimize performance."
165174
)
@@ -191,6 +200,29 @@ class MySqlSourceConfigurationSpecification : ConfigurationSpecification() {
191200
}
192201
}
193202

203+
@JsonSchemaTitle("Table Filter")
204+
@JsonSchemaDescription("Inclusion filter configuration for table selection per database.")
205+
@JsonPropertyOrder("database_name", "table_name_patterns")
206+
@SuppressFBWarnings(value = ["NP_NONNULL_RETURN_VIOLATION"], justification = "Micronaut DI")
207+
class TableFilter {
208+
@JsonProperty("database_name", required = true)
209+
@JsonSchemaTitle("Database Name")
210+
@JsonPropertyDescription(
211+
"The name of the database to apply this filter to. " +
212+
"Should match the database defined in the \"Database\" field above."
213+
)
214+
@JsonSchemaInject(json = """{"order":1,"always_show":true}""")
215+
lateinit var databaseName: String
216+
217+
@JsonProperty("table_name_patterns", required = true)
218+
@JsonSchemaTitle("Table Filter Patterns")
219+
@JsonPropertyDescription(
220+
"List of table name patterns to include. Should be a SQL LIKE pattern."
221+
)
222+
@JsonSchemaInject(json = """{"order":2,"always_show":true,"minItems":1}""")
223+
lateinit var patterns: List<String>
224+
}
225+
194226
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "mode")
195227
@JsonSubTypes(
196228
JsonSubTypes.Type(value = EncryptionPreferred::class, name = "preferred"),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
2+
package io.airbyte.integrations.source.mysql
3+
4+
import io.airbyte.cdk.ConfigErrorException
5+
import io.airbyte.cdk.check.JdbcCheckQueries
6+
import io.airbyte.cdk.discover.MetadataQuerier
7+
import io.airbyte.cdk.jdbc.DefaultJdbcConstants
8+
import io.airbyte.cdk.jdbc.JdbcConnectionFactory
9+
import java.sql.Connection
10+
import java.sql.Statement
11+
import org.junit.jupiter.api.Assertions
12+
import org.junit.jupiter.api.BeforeAll
13+
import org.junit.jupiter.api.Test
14+
import org.junit.jupiter.api.Timeout
15+
import org.junit.jupiter.api.assertThrows
16+
import org.testcontainers.containers.MySQLContainer
17+
18+
class MySqlSourceMetadataQuerierTableFilterTest {
19+
companion object {
20+
val dbContainer: MySQLContainer<*> = MySqlContainerFactory.shared(imageName = "mysql:9.2.0")
21+
22+
// In MySQL, database == schema. We test within 'test' database
23+
val databaseName = "test"
24+
val tableNames = listOf("orders", "customers", "products", "invoices")
25+
26+
val factory =
27+
MySqlSourceMetadataQuerier.Factory(
28+
constants = DefaultJdbcConstants(),
29+
selectQueryGenerator = MySqlSourceOperations(),
30+
fieldTypeMapper = MySqlSourceOperations(),
31+
checkQueries = JdbcCheckQueries(),
32+
)
33+
34+
@JvmStatic
35+
@BeforeAll
36+
@Timeout(value = 300)
37+
fun setupDatabase() {
38+
val config: MySqlSourceConfiguration =
39+
MySqlSourceConfigurationFactory().make(MySqlContainerFactory.config(dbContainer))
40+
41+
val connectionFactory = JdbcConnectionFactory(config)
42+
43+
connectionFactory.get().use { connection: Connection ->
44+
connection.isReadOnly = false
45+
connection.createStatement().use {
46+
it.execute("CREATE DATABASE IF NOT EXISTS test")
47+
}
48+
connection.createStatement().use { it.execute("USE test") }
49+
50+
// Create test tables
51+
tableNames.forEach { tableName ->
52+
connection.createStatement().use { stmt: Statement ->
53+
stmt.execute(
54+
"CREATE TABLE IF NOT EXISTS $tableName (id INT PRIMARY KEY, name VARCHAR(255))"
55+
)
56+
}
57+
}
58+
}
59+
}
60+
}
61+
62+
@Test
63+
fun testNoFilter() {
64+
// When no filters are provided, all tables should be returned
65+
val configPojo =
66+
MySqlContainerFactory.config(dbContainer).apply {
67+
database = databaseName
68+
tableFilters = null
69+
}
70+
71+
val config: MySqlSourceConfiguration = MySqlSourceConfigurationFactory().make(configPojo)
72+
73+
factory.session(config).use { mdq: MetadataQuerier ->
74+
// Verify all tables are returned
75+
val streamNames = mdq.streamNames(databaseName).map { it.name }.sorted()
76+
Assertions.assertEquals(tableNames.sorted(), streamNames)
77+
}
78+
}
79+
80+
@Test
81+
fun testTableFilters() {
82+
val tableFilter =
83+
TableFilter().apply {
84+
databaseName = "test"
85+
patterns = listOf("customers", "orders")
86+
}
87+
88+
val configPojo =
89+
MySqlContainerFactory.config(dbContainer).apply {
90+
database = databaseName
91+
tableFilters = listOf(tableFilter)
92+
}
93+
94+
val config: MySqlSourceConfiguration = MySqlSourceConfigurationFactory().make(configPojo)
95+
96+
factory.session(config).use { mdq: MetadataQuerier ->
97+
val querier = mdq as MySqlSourceMetadataQuerier
98+
99+
// Check that memoizedTableNames only contains filtered tables
100+
val filteredTables =
101+
querier.base.memoizedTableNames
102+
.filter { (it.schema ?: it.catalog) == databaseName }
103+
.map { it.name }
104+
.toSet()
105+
106+
Assertions.assertEquals(setOf("customers", "orders"), filteredTables)
107+
}
108+
}
109+
110+
@Test
111+
fun testNoMatchingTables() {
112+
// Test pattern that doesn't match any tables
113+
val tableFilter =
114+
TableFilter().apply {
115+
databaseName = "test"
116+
patterns = listOf("nonexistent%")
117+
}
118+
119+
val configPojo =
120+
MySqlContainerFactory.config(dbContainer).apply {
121+
database = databaseName
122+
tableFilters = listOf(tableFilter)
123+
}
124+
125+
val config: MySqlSourceConfiguration = MySqlSourceConfigurationFactory().make(configPojo)
126+
127+
factory.session(config).use { mdq: MetadataQuerier ->
128+
val querier = mdq as MySqlSourceMetadataQuerier
129+
val filteredTables =
130+
querier.base.memoizedTableNames
131+
.filter { (it.schema ?: it.catalog) == databaseName }
132+
.map { it.name }
133+
.toSet()
134+
135+
Assertions.assertEquals(emptySet<String>(), filteredTables)
136+
}
137+
}
138+
139+
@Test
140+
fun testFilterSchemaNotInConfiguredSchemas() {
141+
// Filter references a schema that is not in the configured schemas list
142+
val tableFilter =
143+
TableFilter().apply {
144+
databaseName = "nonexistent_schema"
145+
patterns = listOf("orders", "customers")
146+
}
147+
148+
val configPojo =
149+
MySqlContainerFactory.config(dbContainer).apply {
150+
database = databaseName
151+
tableFilters = listOf(tableFilter)
152+
}
153+
154+
assertThrows<ConfigErrorException> {
155+
MySqlSourceConfigurationFactory().makeWithoutExceptionHandling(configPojo)
156+
}
157+
}
158+
159+
@Test
160+
fun testWildcardPatterns() {
161+
// Test SQL LIKE wildcards with MySQL
162+
val tableFilter =
163+
TableFilter().apply {
164+
databaseName = "test"
165+
patterns = listOf("c%") // Should match 'customers'
166+
}
167+
168+
val configPojo =
169+
MySqlContainerFactory.config(dbContainer).apply {
170+
database = databaseName
171+
tableFilters = listOf(tableFilter)
172+
}
173+
174+
val config: MySqlSourceConfiguration = MySqlSourceConfigurationFactory().make(configPojo)
175+
176+
factory.session(config).use { mdq: MetadataQuerier ->
177+
val querier = mdq as MySqlSourceMetadataQuerier
178+
val filteredTables =
179+
querier.base.memoizedTableNames
180+
.filter { (it.schema ?: it.catalog) == databaseName }
181+
.map { it.name }
182+
.toSet()
183+
184+
Assertions.assertEquals(setOf("customers"), filteredTables)
185+
}
186+
}
187+
188+
@Test
189+
fun testMultiplePatternsInOneFilter() {
190+
// Test multiple patterns for the same schema
191+
val tableFilter =
192+
TableFilter().apply {
193+
databaseName = "test"
194+
patterns = listOf("orders", "products", "invoices")
195+
}
196+
197+
val configPojo =
198+
MySqlContainerFactory.config(dbContainer).apply {
199+
database = databaseName
200+
tableFilters = listOf(tableFilter)
201+
}
202+
203+
val config: MySqlSourceConfiguration = MySqlSourceConfigurationFactory().make(configPojo)
204+
205+
factory.session(config).use { mdq: MetadataQuerier ->
206+
val querier = mdq as MySqlSourceMetadataQuerier
207+
val filteredTables =
208+
querier.base.memoizedTableNames
209+
.filter { (it.schema ?: it.catalog) == databaseName }
210+
.map { it.name }
211+
.toSet()
212+
213+
Assertions.assertEquals(setOf("orders", "products", "invoices"), filteredTables)
214+
}
215+
}
216+
}

0 commit comments

Comments
 (0)