Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: implement change stream reader from GCP SpannerChangeStream #3066

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

anicoll
Copy link

@anicoll anicoll commented Dec 9, 2024

No description provided.

@CLAassistant
Copy link

CLAassistant commented Dec 9, 2024

CLA assistant check
All committers have signed the CLA.

@anicoll anicoll marked this pull request as ready for review December 9, 2024 05:37
func newSpannerChangeStreamInputConfig() *service.ConfigSpec {
return service.NewConfigSpec().
Beta().
Version("3.43.0").
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very aware this is most likely not correct...

@rockwotj rockwotj self-requested a review December 10, 2024 15:17
Field(service.NewIntField("start_time_epoch").Advanced().Optional().Default(0).Description("Optional microsecond accurate epoch timestamp to start reading from. If empty time.Now() will be used.")).
Field(service.NewStringField("partition_dsn").Optional().Description("Field used to set the DSN for the metadata partition table, can be the same as stream_dsn.").Example("projects/<project_id>/instances/<instance_id>/databases/<database_id>")).
Field(service.NewStringField("partition_table").Optional().Description("Name of the table to create/use in spanner to track change stream partition metadata.")).
Field(service.NewBoolField("use_in_mememory_partition").Description("use an in memory partition table for tracking the partitions.").Default(false)).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: use_in_mememory_partition

Version("3.43.0").
Categories("Services", "GCP").
Summary("Creates an input that consumes from a spanner change stream.").
Field(service.NewStringField("stream_dsn").Description("Required field to use to connect to spanner for the change stream.").Example("projects/<project_id>/instances/<instance_id>/databases/<database_id>")).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we are now creating the fields as constants so it's harder to mess them up.


func init() {
err := service.RegisterInput(
"gcp_spanner_change_stream", newSpannerChangeStreamInputConfig(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"gcp_spanner_change_stream", newSpannerChangeStreamInputConfig(),
"gcp_spanner_cdc", newSpannerChangeStreamInputConfig(),

if rerr := i.reader.Stream(jobctx, i.changeChannel); rerr != nil {
i.log.Errorf("Subscription error: %v\n", rerr)
close(i.changeChannel)
panic(rerr)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't panic in Connect, please gracefully mark the input as disconnected, then return service.ErrNotConnected in Read so that the input is restarted.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to see an example of doing this - see the postgres_cdc input.

}

func (i *spannerStreamInput) Close(_ context.Context) error {
close(i.changeChannel)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple different places close this - generally only the writer of a channel should close it.

If someone tries to write on this channel there will be a panic.

@@ -25,6 +26,7 @@ require (
github.com/Masterminds/squirrel v1.5.4
github.com/PaesslerAG/gval v1.2.2
github.com/PaesslerAG/jsonpath v0.1.1
github.com/anicoll/screamer v0.0.0-20241206035431-9ba919b54dfd
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We try not to take dependencies on libraries that aren't stable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on this comment and the one below.

raised this PR.
#3083


func (i *spannerStreamInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) {
msg := <-i.changeChannel
data, err := json.Marshal(msg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a struct from an external library - which I don't feel comfortable taking it's model as the same format that we using in our pipeline - the library could make a change and that would be a breaking change in Connect.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#3083
Does this make a difference?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants