Skip to content

Commit 5ce24af

Browse files
committed
fix: add recent job filter for stream status query (#18515)
1 parent 270f7fd commit 5ce24af

File tree

4 files changed

+82
-6
lines changed

4 files changed

+82
-6
lines changed

airbyte-featureflag/src/main/kotlin/FlagDefinitions.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,3 +261,5 @@ object BypassStiggEntitlementChecks : Permanent<Boolean>(key = "platform.bypass-
261261
object EnableDataWorkerUsage : Temporary<Boolean>(key = "platform.enable-data-worker-usage", default = false)
262262

263263
object UseVerifiedDomainsForSsoActivate : Temporary<Boolean>(key = "platform.use-verified-domains-for-sso-activate", default = false)
264+
265+
object UseOptimizedStreamStatusQuery : Temporary<Boolean>(key = "platform.use-optimized-stream-status-query", default = false)

airbyte-server/src/main/kotlin/io/airbyte/server/handlers/StreamStatusesHandler.kt

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ import io.airbyte.api.model.generated.StreamStatusUpdateRequestBody
1919
import io.airbyte.commons.server.handlers.JobHistoryHandler
2020
import io.airbyte.commons.server.handlers.helpers.StatsAggregationHelper
2121
import io.airbyte.config.Job
22+
import io.airbyte.featureflag.Connection
23+
import io.airbyte.featureflag.FeatureFlagClient
24+
import io.airbyte.featureflag.UseOptimizedStreamStatusQuery
2225
import io.airbyte.persistence.job.JobPersistence
2326
import io.airbyte.server.handlers.apidomainmapping.StreamStatusesMapper
2427
import io.airbyte.server.repositories.StreamStatusesRepository
@@ -34,6 +37,7 @@ open class StreamStatusesHandler(
3437
val mapper: StreamStatusesMapper,
3538
private val jobHistoryHandler: JobHistoryHandler,
3639
private val jobPersistence: JobPersistence,
40+
private val featureFlagClient: FeatureFlagClient,
3741
) {
3842
fun createStreamStatus(req: StreamStatusCreateRequestBody): StreamStatusRead {
3943
val model = mapper.map(req)
@@ -64,14 +68,25 @@ open class StreamStatusesHandler(
6468
}
6569

6670
fun listStreamStatusPerRunState(req: ConnectionIdRequestBody): StreamStatusReadList {
67-
val apiList =
68-
repo
69-
.findAllPerRunStateByConnectionId(req.connectionId)
70-
.map { domain -> mapper.map(domain) }
71+
val useOptimized = featureFlagClient.boolVariation(UseOptimizedStreamStatusQuery, Connection(req.connectionId))
7172

73+
val streamStatuses =
74+
if (useOptimized) {
75+
// Optimized: same query but with recency filter (last 100 jobs)
76+
// Reduces query time from 2+ min to ~18 sec for high-volume connections
77+
repo.findAllPerRunStateByConnectionIdWithRecentJobsFilter(req.connectionId, RECENT_JOBS_LIMIT)
78+
} else {
79+
repo.findAllPerRunStateByConnectionId(req.connectionId)
80+
}
81+
82+
val apiList = streamStatuses.map { domain -> mapper.map(domain) }
7283
return StreamStatusReadList().streamStatuses(apiList)
7384
}
7485

86+
companion object {
87+
const val RECENT_JOBS_LIMIT = 100
88+
}
89+
7590
fun mapStreamStatusToSyncReadResult(streamStatus: StreamStatusRead): ConnectionSyncResultRead {
7691
val jobStatus =
7792
if (streamStatus.runState == StreamStatusRunState.COMPLETE) {

airbyte-server/src/main/kotlin/io/airbyte/server/repositories/StreamStatusesRepository.kt

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,41 @@ abstract class StreamStatusesRepository :
115115
)
116116
abstract fun findAllPerRunStateByConnectionId(connectionId: UUID): List<StreamStatus>
117117

118+
/**
119+
* Returns the latest stream status per run state (and job type) for a connection,
120+
* limited to recent jobs for better performance on high-volume connections.
121+
*
122+
* This is the same logic as findAllPerRunStateByConnectionId but with a recency filter
123+
* that limits processing to the last N jobs, reducing query time from 2+ minutes to ~18 seconds
124+
* for connections with millions of stream_statuses.
125+
*/
126+
@Query(
127+
"""
128+
SELECT DISTINCT ON (ss.stream_name, ss.stream_namespace, 1 /* computed run_state */, 2 /* computed incomplete_run_cause */, ss.job_type)
129+
$STREAM_STATUS_WITH_FALLBACKS
130+
FROM stream_statuses ss
131+
JOIN jobs j on j.id = ss.job_id
132+
WHERE ss.connection_id = :connectionId
133+
AND ss.job_id >= (
134+
SELECT COALESCE(MIN(id), 0)
135+
FROM (
136+
SELECT id FROM jobs
137+
WHERE scope = CAST(:connectionId AS VARCHAR)
138+
AND config_type IN ('sync', 'refresh')
139+
ORDER BY id DESC
140+
LIMIT :recentJobsLimit
141+
) recent_jobs
142+
)
143+
ORDER BY
144+
ss.job_type, ss.stream_name, ss.stream_namespace,
145+
1 /* computed run_state */,2 /* computed incomplete_run_cause */, ss.transitioned_at desc
146+
""",
147+
)
148+
abstract fun findAllPerRunStateByConnectionIdWithRecentJobsFilter(
149+
connectionId: UUID,
150+
recentJobsLimit: Int,
151+
): List<StreamStatus>
152+
118153
@Query(
119154
"""
120155
SELECT

airbyte-server/src/test/kotlin/io/airbyte/server/handlers/StreamStatusesHandlerTest.kt

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ import io.airbyte.config.StreamSyncStats
2929
import io.airbyte.config.SyncStats
3030
import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStreamStatusJobType
3131
import io.airbyte.db.instance.jobs.jooq.generated.enums.JobStreamStatusRunState
32+
import io.airbyte.featureflag.Connection
33+
import io.airbyte.featureflag.FeatureFlagClient
34+
import io.airbyte.featureflag.UseOptimizedStreamStatusQuery
3235
import io.airbyte.persistence.job.JobPersistence
3336
import io.airbyte.server.handlers.apidomainmapping.StreamStatusesMapper
3437
import io.airbyte.server.repositories.StreamStatusesRepository
@@ -51,14 +54,16 @@ internal class StreamStatusesHandlerTest {
5154
private lateinit var handler: StreamStatusesHandler
5255
private lateinit var jobPersistence: JobPersistence
5356
private lateinit var jobHistoryHandler: JobHistoryHandler
57+
private lateinit var featureFlagClient: FeatureFlagClient
5458

5559
@BeforeEach
5660
fun setup() {
5761
repo = mockk()
5862
mapper = mockk()
5963
jobHistoryHandler = mockk()
6064
jobPersistence = mockk()
61-
handler = StreamStatusesHandler(repo, mapper, jobHistoryHandler, jobPersistence)
65+
featureFlagClient = mockk()
66+
handler = StreamStatusesHandler(repo, mapper, jobHistoryHandler, jobPersistence, featureFlagClient)
6267
}
6368

6469
@Test
@@ -107,19 +112,37 @@ internal class StreamStatusesHandlerTest {
107112
}
108113

109114
@Test
110-
fun testListPerRunState() {
115+
fun testListPerRunStateOriginal() {
111116
val connectionId = UUID.randomUUID()
112117
val apiReq = ConnectionIdRequestBody().connectionId(connectionId)
113118
val domainItem = StreamStatusBuilder().build()
114119
val apiItem = StreamStatusRead()
115120
val apiResp = StreamStatusReadList().streamStatuses(listOf<@Valid StreamStatusRead?>(apiItem))
116121

122+
// Use original implementation (feature flag disabled)
123+
every { featureFlagClient.boolVariation(UseOptimizedStreamStatusQuery, Connection(connectionId)) } returns false
117124
every { repo.findAllPerRunStateByConnectionId(connectionId) } returns listOf(domainItem)
118125
every { mapper.map(domainItem) } returns apiItem
119126

120127
Assertions.assertEquals(apiResp, handler.listStreamStatusPerRunState(apiReq))
121128
}
122129

130+
@Test
131+
fun testListPerRunStateOptimized() {
132+
val connectionId = UUID.randomUUID()
133+
val apiReq = ConnectionIdRequestBody().connectionId(connectionId)
134+
val domainItem = StreamStatusBuilder().build()
135+
val apiItem = StreamStatusRead()
136+
val apiResp = StreamStatusReadList().streamStatuses(listOf<@Valid StreamStatusRead?>(apiItem))
137+
138+
// Use optimized implementation (feature flag enabled)
139+
every { featureFlagClient.boolVariation(UseOptimizedStreamStatusQuery, Connection(connectionId)) } returns true
140+
every { repo.findAllPerRunStateByConnectionIdWithRecentJobsFilter(connectionId, 100) } returns listOf(domainItem)
141+
every { mapper.map(domainItem) } returns apiItem
142+
143+
Assertions.assertEquals(apiResp, handler.listStreamStatusPerRunState(apiReq))
144+
}
145+
123146
@Test
124147
fun testUptimeHistory() {
125148
val connectionId = UUID.randomUUID()
@@ -379,6 +402,7 @@ internal class StreamStatusesHandlerTest {
379402
StreamStatusesMapper(),
380403
jobHistoryHandler,
381404
jobPersistence,
405+
featureFlagClient,
382406
)
383407
Assertions.assertEquals(expected, handlerWithRealMapper.getConnectionUptimeHistory(apiReq))
384408
}

0 commit comments

Comments
 (0)