Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snowpipe: exactly once semantics #3060

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open

snowpipe: exactly once semantics #3060

wants to merge 29 commits into from

Conversation

rockwotj
Copy link
Collaborator

@rockwotj rockwotj commented Dec 5, 2024

Support 2 new properties in snowflake_streaming:

  1. offset_token: A new property to support exactly once delivery: https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview#offset-tokens
  2. channel_name: The ability to explicitly assign a batch to a channel. The current channel_prefix option doesn't support explicitly picking a channel, this allows exactly once from Kafka.

@rockwotj rockwotj force-pushed the snow-once branch 4 times, most recently from e5423ce to 10a350f Compare December 9, 2024 21:38
@rockwotj rockwotj marked this pull request as ready for review December 10, 2024 02:53
@rockwotj rockwotj force-pushed the snow-once branch 4 times, most recently from c189654 to f4c4952 Compare December 17, 2024 05:01
this is what is required for exactly once. We don't yet use it.
NOTE that since we can't ensure certain messages can go to a specific
channel at the moment, this only really works with max_in_flight=1,
which is probably fine for postgres, but another commit will support
channel_name properly, so one can specify explicitly the mapping from
data to channel.
This will help to re-use all this logic when we create the new output
that specifies channel names explicitly.
To a seperate function so it can be used between different outputs.
To clarify it, instead of spreading it out all over, this also means the
schema migration function can now be a free function
One that is responsible for coordination of schema evolution and other
small pieces (like custom mappings).

The purpose of this is to allow for another kind of inner output that
can allow for a user to specifically set the channel name (instead of
using a pool).
I'm not sure if this is 100% correct, but it will work for most cases.
See the examples on what this enables with a Redpanda/Kafka input (but
not kafka_franz!).
This seems a bit clearer and has nice duality with the indexed pool
We should try not to always run a SQL query everytime we startup for
cost reasons. Instead of running a query (which is likely flaky because
of identifier normalization anyways), just open the channel lazily and
catch the specific error for the table not existing, then create the
table and retry.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant