Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snowpipe: exactly once semantics #3060

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
71533b7
snowflake: add a test about reopening a channel invalidates old channels
rockwotj Dec 3, 2024
b2f95a2
snowpipe: add lint rules for build options
rockwotj Dec 4, 2024
3d603fb
snowpipe: rename capped package to pool
rockwotj Dec 4, 2024
c9b765e
snowpipe: add indexed pool utility
rockwotj Dec 4, 2024
ba903e8
snowpipe: plumb offset_token around in snowpipe streaming api
rockwotj Dec 4, 2024
4f5e401
snowpipe: support exactly once
rockwotj Dec 4, 2024
781a8a0
snowpipe: extract out schema evolution to seperate struct
rockwotj Dec 5, 2024
4f07d36
snowpipe: fix linter errors
rockwotj Dec 5, 2024
0aca1f4
snowpipe: update docs
rockwotj Dec 5, 2024
7681b5f
snowpipe: cleanup integration test
rockwotj Dec 5, 2024
b0730d6
snowpipe: fix pool tests
rockwotj Dec 5, 2024
6efde47
snowpipe: extract exactly once processing
rockwotj Dec 5, 2024
2c2566d
snowpipe: move schema migration locking to one place
rockwotj Dec 5, 2024
ebec700
snowpipe: extract metrics to another file
rockwotj Dec 5, 2024
3cb41a4
snowpipe: split snowpipe output responsibility
rockwotj Dec 5, 2024
2bf1822
snowpipe: do some basic normalization for table names
rockwotj Dec 5, 2024
cb164ea
snowpipe: support explicitly specifying the channel name
rockwotj Dec 6, 2024
99aeb62
snowflake: move id generation into pool
rockwotj Dec 6, 2024
07841e1
chore: fmt
rockwotj Dec 6, 2024
be70504
snowpipe: prevent races with Close/Connect
rockwotj Dec 6, 2024
cbe3f8a
snowpipe: fix cleanup
rockwotj Dec 9, 2024
e5548c7
snowflake: clarify docs
rockwotj Dec 10, 2024
b62051e
snowflake: add some totally wicked examples
rockwotj Dec 10, 2024
ec62d88
snowpipe: remove application query param which does nothing
rockwotj Dec 16, 2024
790ab20
snowpipe: add metadata for SNOWPIPE_STREAMING_CLIENT_HISTORY
rockwotj Dec 16, 2024
afe38a5
snowpipe: support interpolated tables
rockwotj Dec 17, 2024
af10166
update changelog
rockwotj Dec 17, 2024
b212668
snowpipe: fix table creation for dynamic tables
rockwotj Dec 17, 2024
98b547f
snowpipe: remove SQL query from startup
rockwotj Dec 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ All notable changes to this project will be documented in this file.
### Added

- `avro` scanner now emits metadata for the Avro schema it used along with the schema fingerprint (@rockwotj)
- `snowpipe_streaming` now supports interpolating table names (@rockwotj)
- `snowpipe_streaming` now supports interpolating channel names (@rockwotj)
- `snowpipe_streaming` now supports exactly once delivery using `offset_token` (@rockwotj)

## 4.44.0 - 2024-12-13

Expand Down
207 changes: 174 additions & 33 deletions docs/modules/components/pages/outputs/snowflake_streaming.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ output:
account: ORG-ACCOUNT # No default (required)
user: "" # No default (required)
role: ACCOUNTADMIN # No default (required)
database: "" # No default (required)
schema: "" # No default (required)
table: "" # No default (required)
database: MY_DATABASE # No default (required)
schema: PUBLIC # No default (required)
table: MY_TABLE # No default (required)
private_key: "" # No default (optional)
private_key_file: "" # No default (optional)
private_key_pass: "" # No default (optional)
Expand Down Expand Up @@ -83,9 +83,9 @@ output:
account: ORG-ACCOUNT # No default (required)
user: "" # No default (required)
role: ACCOUNTADMIN # No default (required)
database: "" # No default (required)
schema: "" # No default (required)
table: "" # No default (required)
database: MY_DATABASE # No default (required)
schema: PUBLIC # No default (required)
table: MY_TABLE # No default (required)
private_key: "" # No default (optional)
private_key_file: "" # No default (optional)
private_key_pass: "" # No default (optional)
Expand Down Expand Up @@ -113,7 +113,9 @@ output:
check: ""
processors: [] # No default (optional)
max_in_flight: 4
channel_prefix: "" # No default (optional)
channel_prefix: channel-${HOST} # No default (optional)
channel_name: partition-${!@kafka_partition} # No default (optional)
offset_token: offset-${!"%016X".format(@kafka_offset)} # No default (optional)
```

--
Expand Down Expand Up @@ -155,23 +157,70 @@ You can monitor the output batch size using the `snowflake_compressed_output_siz

[tabs]
======
Ingesting data from Redpanda::
Exactly once CDC into Snowflake::
+
--

How to ingest data from Redpanda with consumer groups, decode the schema using the schema registry, then write the corresponding data into Snowflake.
How to send data from a PostgreSQL table into Snowflake exactly once using Postgres Logical Replication.

NOTE: If attempting to do exactly-once it's important that rows are delivered in order to the output. Be sure to read the documentation for offset_token first.
Removing the offset_token is a safer option that will instruct Redpanda Connect to use it's default at-least-once delivery model instead.

```yaml
input:
postgres_cdc:
dsn: postgres://foouser:foopass@localhost:5432/foodb
schema: "public"
tables: ["my_pg_table"]
# We want very large batches - each batch will be sent to Snowflake individually
# so to optimize query performance we want as big of files as we have memory for
batching:
count: 50000
period: 45s
# Prevent multiple batches from being in flight at once, so that we never send
# batch while another batch is being retried, this is important to ensure that
# the Snowflake Snowpipe Streaming channel does not see older data - as it will
# assume that the older data is already committed.
checkpoint_limit: 1
output:
snowflake_streaming:
# We use the log sequence number in the WAL from Postgres to ensure we
# only upload data exactly once, these are already lexicographically
# ordered.
offset_token: "${!@lsn}"
# Since we're sending a single ordered log, we can only send on thing
# at a time to ensure that we're properly incrementing our offset_token
# and only using a single channel at a time.
max_in_flight: 1
account: "MYSNOW-ACCOUNT"
user: MYUSER
role: ACCOUNTADMIN
database: "MYDATABASE"
schema: "PUBLIC"
table: "MY_PG_TABLE"
private_key_file: "my/private/key.p8"
```

--
Ingesting data exactly once from Redpanda::
+
--

How to ingest data from Redpanda with consumer groups, decode the schema using the schema registry, then write the corresponding data into Snowflake exactly once.

NOTE: If attempting to do exactly-once it's important that records are delivered in order to the output and correctly partitioned. Be sure to read the documentation for
channel_name and offset_token first. Removing the offset_token is a safer option that will instruct Redpanda Connect to use it's default at-least-once delivery model instead.

```yaml
input:
kafka_franz:
seed_brokers: ["redpanda.example.com:9092"]
redpanda_common:
topics: ["my_topic_going_to_snow"]
consumer_group: "redpanda_connect_to_snowflake"
tls: {enabled: true}
sasl:
- mechanism: SCRAM-SHA-256
username: MY_USER_NAME
password: "${TODO}"
# We want very large batches - each batch will be sent to Snowflake individually
# so to optimize query performance we want as big of files as we have memory for
fetch_max_bytes: 100MiB
fetch_min_bytes: 50MiB
partition_buffer_bytes: 100MiB
pipeline:
processors:
- schema_registry_decode:
Expand All @@ -181,25 +230,34 @@ pipeline:
username: MY_USER_NAME
password: "${TODO}"
output:
snowflake_streaming:
# By default there is only a single channel per output table allowed
# if we want to have multiple Redpanda Connect streams writing data
# then we need a unique channel prefix per stream. We'll use the host
# name to get unique prefixes in this example.
channel_prefix: "snowflake-channel-for-${HOST}"
account: "MYSNOW-ACCOUNT"
user: MYUSER
role: ACCOUNTADMIN
database: "MYDATABASE"
schema: "PUBLIC"
table: "MYTABLE"
private_key_file: "my/private/key.p8"
schema_evolution:
enabled: true
fallback:
- snowflake_streaming:
# To ensure that we write an ordered stream each partition in kafka gets it's own
# channel.
channel_name: "partition-${!@kafka_partition}"
# Ensure that our offsets are lexicographically sorted in string form by padding with
# leading zeros
offset_token: offset-${!"%016X".format(@kafka_offset)}
account: "MYSNOW-ACCOUNT"
user: MYUSER
role: ACCOUNTADMIN
database: "MYDATABASE"
schema: "PUBLIC"
table: "MYTABLE"
private_key_file: "my/private/key.p8"
schema_evolution:
enabled: true
# In order to prevent delivery orders from messing with the order of delivered records
# it's important that failures are immediately sent to a dead letter queue and not retried
# to Snowflake. See the ordering documentation for the "redpanda" input for more details.
- retry:
output:
redpanda_common:
topic: "dead_letter_queue"
```

--
HTTP Sidecar to push data to Snowflake::
HTTP Server to push data to Snowflake::
+
--

Expand Down Expand Up @@ -231,6 +289,13 @@ output:
schema: "PUBLIC"
table: "MYTABLE"
private_key_file: "my/private/key.p8"
# By default there is only a single channel per output table allowed
# if we want to have multiple Redpanda Connect streams writing data
# then we need a unique channel prefix per stream. We'll use the host
# name to get unique prefixes in this example.
channel_prefix: "snowflake-channel-for-${HOST}"
schema_evolution:
enabled: true
```

--
Expand Down Expand Up @@ -282,6 +347,12 @@ The Snowflake database to ingest data into.
*Type*: `string`


```yml
# Examples

database: MY_DATABASE
```

=== `schema`

The Snowflake schema to ingest data into.
Expand All @@ -290,14 +361,27 @@ The Snowflake schema to ingest data into.
*Type*: `string`


```yml
# Examples

schema: PUBLIC
```

=== `table`

The Snowflake table to ingest data into.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].


*Type*: `string`


```yml
# Examples

table: MY_TABLE
```

=== `private_key`

The PEM encoded private RSA key to use for authenticating with Snowflake. Either this or `private_key_file` must be specified.
Expand Down Expand Up @@ -527,14 +611,71 @@ The maximum number of messages to have in flight at a given time. Increase this

The prefix to use when creating a channel name.
Duplicate channel names will result in errors and prevent multiple instances of Redpanda Connect from writing at the same time.
By default this will create a channel name that is based on the table FQN so there will only be a single stream per table.
By default if neither `channel_prefix`or `channel_name is specified then the output will create a channel name that is based on the table FQN so there will only be a single stream per table.

At most `max_in_flight` channels will be opened.

This option is mutually exclusive with `channel_name`.

NOTE: There is a limit of 10,000 streams per table - if using more than 10k streams please reach out to Snowflake support.


*Type*: `string`


```yml
# Examples

channel_prefix: channel-${HOST}
```

=== `channel_name`

The channel name to use.
Duplicate channel names will result in errors and prevent multiple instances of Redpanda Connect from writing at the same time.
Note that batches are assumed to all contain messages for the same channel, so this interpolation is only executed on the first
message in each batch. It's recommended to batch at the input level to ensure that batches contain messages for the same channel
if using an input that is partitioned (such as an Apache Kafka topic).

This option is mutually exclusive with `channel_prefix`.

NOTE: There is a limit of 10,000 streams per table - if using more than 10k streams please reach out to Snowflake support.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].


*Type*: `string`


```yml
# Examples

channel_name: partition-${!@kafka_partition}
```

=== `offset_token`

The offset token to use for exactly once delivery of data in the pipeline. When data is sent on a channel, each message in a batch's offset token
is compared to the latest token for a channel. If the offset token is lexicographically less than the latest in the channel, it's assumed the message is a duplicate and
is dropped. This means it is *very important* to have ordered delivery to the output, any out of order messages to the output will be seen as duplicates and dropped.
Specifically this means that retried messages could be seen as duplicates if later messages have succeeded in the meantime, so in most circumstances a dead letter queue
output should be employed for failed messages.

NOTE: It's assumed that messages within a batch are in increasing order by offset token, additionally if you're using a numeric value as an offset token, make sure to pad
the value so that it's lexicographically ordered in it's string representation, since offset tokens are compared in string form.

For more information about offset tokens, see https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview#offset-tokens[^Snowflake Documentation]
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].


*Type*: `string`


```yml
# Examples

offset_token: offset-${!"%016X".format(@kafka_offset)}

offset_token: postgres-${!@lsn}
```


Loading
Loading