Skip to content

Commit

Permalink
Merge branch 'airbytehq:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
andyh565 authored Oct 15, 2024
2 parents ad7433d + cbcb669 commit 444967d
Show file tree
Hide file tree
Showing 1,369 changed files with 143,075 additions and 89,382 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/auto_merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ name: Auto merge connector PRs Cron

on:
schedule:
# 0AM UTC is 2AM CEST, 3AM EEST, 5PM PDT.
- cron: "0 0 * * *"
# Every 2 hours on the hour.
- cron: "0 */2 * * *"
workflow_dispatch:
jobs:
run_auto_merge:
Expand Down
44 changes: 0 additions & 44 deletions .github/workflows/cdk-codeflash.yml

This file was deleted.

11 changes: 11 additions & 0 deletions .github/workflows/publish-bulk-cdk.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ jobs:
gradle-distribution-sha-256-sum-warning: false
arguments: --scan :airbyte-cdk:bulk:bulkCdkBuild

- name: Integration test Bulk CDK
uses: burrunan/gradle-cache-action@v1
env:
CI: true
with:
read-only: true
job-id: bulk-cdk-publish
concurrent: true
gradle-distribution-sha-256-sum-warning: false
arguments: --scan :airbyte-cdk:bulk:bulkCdkIntegrationTest

- name: Publish Poms and Jars to CloudRepo
uses: burrunan/gradle-cache-action@v1
env:
Expand Down
1 change: 1 addition & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output
airbyte-ci/connectors/pipelines/tests/test_changelog/result_files
airbyte-integrations/bases/connector-acceptance-test/unit_tests/data/docs
airbyte-integrations/connectors/destination-*/src/test-integration/resources/expected-spec*.json

# Ignore manifest files in manifest-only connectors
# This is done due to prettier being overly opinionated on the formatting of quotes
Expand Down
4 changes: 4 additions & 0 deletions airbyte-cdk/bulk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ allprojects {
}
}

tasks.register('bulkCdkIntegrationTest').configure {
dependsOn allprojects.collect {it.tasks.matching { it.name == 'integrationTest' }}
}

if (buildNumberFile.exists()) {
tasks.register('bulkCdkBuild').configure {
dependsOn allprojects.collect {it.tasks.named('build')}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private fun resolveValues(
log.warn { "File '$jsonFile' not found for '$cliOptionKey'." }
continue
}
values["$prefix.json"] = jsonFile.readText()
values["$prefix.json"] = jsonFile.readText().replace("$", "\${:$}")
}
return values
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,26 @@ import io.airbyte.protocol.models.JsonSchemaType
* This maps to the subset of [JsonSchemaType] which is used in practice. Its main reason for
* existing is to provide type-safety and convenient comparisons and string representations.
*/
sealed interface AirbyteType {
sealed interface AirbyteSchemaType {
/** Unwraps the underlying Airbyte protocol type object. */
fun asJsonSchemaType(): JsonSchemaType

/** Convenience method to generate the JSON Schema object. */
fun asJsonSchema(): JsonNode = Jsons.valueToTree(asJsonSchemaType().jsonSchemaTypeMap)
}

data class ArrayAirbyteType(
val item: AirbyteType,
) : AirbyteType {
data class ArrayAirbyteSchemaType(
val item: AirbyteSchemaType,
) : AirbyteSchemaType {
override fun asJsonSchemaType(): JsonSchemaType =
JsonSchemaType.builder(JsonSchemaPrimitiveUtil.JsonSchemaPrimitive.ARRAY)
.withItems(item.asJsonSchemaType())
.build()
}

enum class LeafAirbyteType(
enum class LeafAirbyteSchemaType(
private val jsonSchemaType: JsonSchemaType,
) : AirbyteType {
) : AirbyteSchemaType {
BOOLEAN(JsonSchemaType.BOOLEAN),
STRING(JsonSchemaType.STRING),
BINARY(JsonSchemaType.STRING_BASE_64),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<Property name="container-log-pattern">%d{yyyy-MM-dd'T'HH:mm:ss,SSS}{GMT+0}`%replace{%X{log_source}}{^ -}{} > %replace{%m}{$${env:LOG_SCRUB_PATTERN:-\*\*\*\*\*}}{*****}%n</Property>
<!-- Always log INFO by default. -->
<Property name="log-level">${sys:LOG_LEVEL:-${env:LOG_LEVEL:-INFO}}</Property>
<Property name="logDir">target/test-logs/${date:yyyy-MM-dd'T'HH:mm:ss}</Property>
<Property name="logDir">build/test-logs/${date:yyyy-MM-dd'T'HH:mm:ss}</Property>
</Properties>

<Appenders>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ interface SourceConfiguration : Configuration, SshTunnelConfiguration {
/** Does READ generate states of type GLOBAL? */
val global: Boolean

/** Maximum amount of time may be set to limit overall snapshotting duration */
val maxSnapshotReadDuration: Duration?

/** During the READ operation, how often a feed should checkpoint, ideally. */
val checkpointTargetInterval: Duration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ interface AirbyteStreamFactory {
discoveredStream.id.name,
discoveredStream.id.namespace,
discoveredStream.columns.map {
AirbyteField.of(it.id, it.type.airbyteType.asJsonSchemaType())
AirbyteField.of(it.id, it.type.airbyteSchemaType.asJsonSchemaType())
},
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.discover

import io.airbyte.cdk.data.AirbyteType
import io.airbyte.cdk.data.AirbyteSchemaType
import io.airbyte.cdk.data.IntCodec
import io.airbyte.cdk.data.JsonDecoder
import io.airbyte.cdk.data.JsonEncoder
import io.airbyte.cdk.data.JsonStringCodec
import io.airbyte.cdk.data.LeafAirbyteType
import io.airbyte.cdk.data.LeafAirbyteSchemaType
import io.airbyte.cdk.data.OffsetDateTimeCodec
import java.time.OffsetDateTime

Expand All @@ -23,7 +23,7 @@ sealed interface FieldOrMetaField {
*/
interface FieldType {
/** maps to [io.airbyte.protocol.models.Field.type] */
val airbyteType: AirbyteType
val airbyteSchemaType: AirbyteSchemaType
val jsonEncoder: JsonEncoder<*>
}

Expand Down Expand Up @@ -73,19 +73,20 @@ enum class CommonMetaField(
}

data object CdcStringMetaFieldType : LosslessFieldType {
override val airbyteType: AirbyteType = LeafAirbyteType.STRING
override val airbyteSchemaType: AirbyteSchemaType = LeafAirbyteSchemaType.STRING
override val jsonEncoder: JsonEncoder<String> = JsonStringCodec
override val jsonDecoder: JsonDecoder<String> = JsonStringCodec
}

data object CdcIntegerMetaFieldType : LosslessFieldType {
override val airbyteType: AirbyteType = LeafAirbyteType.INTEGER
override val airbyteSchemaType: AirbyteSchemaType = LeafAirbyteSchemaType.INTEGER
override val jsonEncoder: JsonEncoder<Int> = IntCodec
override val jsonDecoder: JsonDecoder<Int> = IntCodec
}

data object CdcOffsetDateTimeMetaFieldType : LosslessFieldType {
override val airbyteType: AirbyteType = LeafAirbyteType.TIMESTAMP_WITH_TIMEZONE
override val airbyteSchemaType: AirbyteSchemaType =
LeafAirbyteSchemaType.TIMESTAMP_WITH_TIMEZONE
override val jsonEncoder: JsonEncoder<OffsetDateTime> = OffsetDateTimeCodec
override val jsonDecoder: JsonDecoder<OffsetDateTime> = OffsetDateTimeCodec
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
package io.airbyte.cdk.output

import io.airbyte.cdk.StreamIdentifier
import io.airbyte.cdk.data.AirbyteType
import io.airbyte.cdk.data.AirbyteSchemaType
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.DefaultImplementation
import jakarta.inject.Singleton
Expand Down Expand Up @@ -43,8 +43,8 @@ data class FieldNotFound(
data class FieldTypeMismatch(
override val streamID: StreamIdentifier,
val fieldName: String,
val expected: AirbyteType,
val actual: AirbyteType,
val expected: AirbyteSchemaType,
val actual: AirbyteSchemaType,
) : CatalogValidationFailure

data class InvalidPrimaryKey(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,11 @@ data class Stream(
override val label: String
get() = id.toString()
}

/** List of [Stream]s this [Feed] emits records for. */
val Feed.streams
get() =
when (this) {
is Global -> streams
is Stream -> listOf(this)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
package io.airbyte.cdk.read

import io.airbyte.cdk.SystemErrorException
import io.airbyte.cdk.asProtocolStreamDescriptor
import io.airbyte.cdk.command.OpaqueStateValue
import io.airbyte.cdk.util.ThreadRenamingCoroutineName
import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
Expand Down Expand Up @@ -46,7 +44,7 @@ class FeedReader(
// Publish a checkpoint if applicable.
maybeCheckpoint()
// Publish stream completion.
emitStreamStatus(AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE)
root.streamStatusManager.notifyComplete(feed)
break
}
// Launch coroutines which read from each partition.
Expand Down Expand Up @@ -85,7 +83,7 @@ class FeedReader(
acquirePartitionsCreatorResources(partitionsCreatorID, partitionsCreator)
}
if (1L == partitionsCreatorID) {
emitStreamStatus(AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.STARTED)
root.streamStatusManager.notifyStarting(feed)
}
return withContext(ctx("round-$partitionsCreatorID-create-partitions")) {
createPartitionsWithResources(partitionsCreatorID, partitionsCreator)
Expand Down Expand Up @@ -309,14 +307,4 @@ class FeedReader(
root.outputConsumer.accept(stateMessage)
}
}

private fun emitStreamStatus(status: AirbyteStreamStatusTraceMessage.AirbyteStreamStatus) {
if (feed is Stream) {
root.outputConsumer.accept(
AirbyteStreamStatusTraceMessage()
.withStreamDescriptor(feed.id.asProtocolStreamDescriptor())
.withStatus(status),
)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
package io.airbyte.cdk.read

import io.airbyte.cdk.ConfigErrorException
import io.airbyte.cdk.output.OutputConsumer
import io.airbyte.cdk.util.ThreadRenamingCoroutineName
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.CoroutineContext
import kotlin.time.toKotlinDuration
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
Expand Down Expand Up @@ -51,6 +53,8 @@ class RootReader(
}
}

val streamStatusManager = StreamStatusManager(stateManager.feeds, outputConsumer::accept)

/** Reads records from all [Feed]s. */
suspend fun read(listener: suspend (Map<Feed, Job>) -> Unit = {}) {
supervisorScope {
Expand All @@ -60,42 +64,37 @@ class RootReader(
val feedJobs: Map<Feed, Job> =
feeds.associateWith { feed: Feed ->
val coroutineName = ThreadRenamingCoroutineName(feed.label)
val handler = FeedExceptionHandler(feed, exceptions)
val handler = FeedExceptionHandler(feed, streamStatusManager, exceptions)
launch(coroutineName + handler) { FeedReader(this@RootReader, feed).read() }
}
// Call listener hook.
listener(feedJobs)
// Join on all stream feeds and collect caught exceptions.
val streamExceptions: Map<Stream, Throwable?> =
feeds.filterIsInstance<Stream>().associateWith {
feedJobs[it]?.join()
exceptions[it]
}
// Cancel any incomplete global feed job whose stream feed jobs have not all succeeded.
for ((global, globalJob) in feedJobs) {
if (global !is Global) continue
if (globalJob.isCompleted) continue
val globalStreamExceptions: List<Throwable> =
global.streams.mapNotNull { streamExceptions[it] }
if (globalStreamExceptions.isNotEmpty()) {
val cause: Throwable =
globalStreamExceptions.reduce { acc: Throwable, exception: Throwable ->
acc.addSuppressed(exception)
acc
}
globalJob.cancel("at least one stream did non complete", cause)
}
}
// Join on all global feeds and collect caught exceptions.
val globalExceptions: Map<Global, Throwable?> =
feeds.filterIsInstance<Global>().associateWith {
feedJobs[it]?.join()
exceptions[it]
}

// Certain errors on the global feed cause a full stop to all stream reads
if (globalExceptions.values.filterIsInstance<ConfigErrorException>().isNotEmpty()) {
this@supervisorScope.cancel()
}

// Join on all stream feeds and collect caught exceptions.
val streamExceptions: Map<Stream, Throwable?> =
feeds.filterIsInstance<Stream>().associateWith {
try {
feedJobs[it]?.join()
exceptions[it]
} catch (_: CancellationException) {
null
}
}
// Reduce and throw any caught exceptions.
val caughtExceptions: List<Throwable> =
streamExceptions.values.mapNotNull { it } +
globalExceptions.values.mapNotNull { it }
globalExceptions.values.mapNotNull { it } +
streamExceptions.values.mapNotNull { it }
if (caughtExceptions.isNotEmpty()) {
val cause: Throwable =
caughtExceptions.reduce { acc: Throwable, exception: Throwable ->
Expand All @@ -109,6 +108,7 @@ class RootReader(

class FeedExceptionHandler(
val feed: Feed,
val streamStatusManager: StreamStatusManager,
private val exceptions: ConcurrentHashMap<Feed, Throwable>,
) : CoroutineExceptionHandler {
private val log = KotlinLogging.logger {}
Expand All @@ -121,6 +121,7 @@ class RootReader(
exception: Throwable,
) {
log.warn(exception) { "canceled feed '${feed.label}' due to thrown exception" }
streamStatusManager.notifyFailure(feed)
exceptions[feed] = exception
}

Expand Down
Loading

0 comments on commit 444967d

Please sign in to comment.