Skip to content

Commit

Permalink
nullable e2e includes numeric
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 21, 2025
1 parent 75fc69b commit 319ebd3
Showing 1 changed file with 15 additions and 29 deletions.
44 changes: 15 additions & 29 deletions flow/e2e/clickhouse/peer_flow_ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,21 +190,17 @@ func (s ClickHouseSuite) Test_NullableMirrorSetting() {
srcFullName := s.attachSchemaSuffix(srcTableName)
dstTableName := "test_nullable_mirror_dst"

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
require.NoError(s.t, s.source.Exec(fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
key TEXT NOT NULL,
ky TEXT NOT NULL,
val TEXT,
n NUMERIC,
t TIMESTAMP
);
`, srcFullName))
require.NoError(s.t, err)
`, srcFullName)))

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (key) VALUES ('init');
`, srcFullName))
require.NoError(s.t, err)
require.NoError(s.t, s.source.Exec(fmt.Sprintf(`INSERT INTO %s (ky) VALUES ('init')`, srcFullName)))

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("ch_nullable_mirror"),
Expand All @@ -219,14 +215,11 @@ func (s ClickHouseSuite) Test_NullableMirrorSetting() {
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on initial", srcTableName, dstTableName, "id,\"key\",val,n,t")
e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on initial", srcTableName, dstTableName, "id,ky,val,n,t")

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (key) VALUES ('cdc');
`, srcFullName))
require.NoError(s.t, err)
require.NoError(s.t, s.source.Exec(fmt.Sprintf(`INSERT INTO %s (ky) VALUES ('cdc')`, srcFullName)))

e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id,\"key\",val,n,t")
e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id,ky,val,n,t")

env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
Expand All @@ -237,21 +230,17 @@ func (s ClickHouseSuite) Test_NullableColumnSetting() {
srcFullName := s.attachSchemaSuffix(srcTableName)
dstTableName := "test_nullable_column_dst"

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
require.NoError(s.t, s.source.Exec(fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
key TEXT NOT NULL,
ky TEXT NOT NULL,
val TEXT,
n NUMERIC,
t TIMESTAMP
);
`, srcFullName))
require.NoError(s.t, err)
`, srcFullName)))

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (key) VALUES ('init');
`, srcFullName))
require.NoError(s.t, err)
require.NoError(s.t, s.source.Exec(fmt.Sprintf(`INSERT INTO %s (ky) VALUES ('init')`, srcFullName)))

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("ch_nullable_column"),
Expand All @@ -262,7 +251,7 @@ func (s ClickHouseSuite) Test_NullableColumnSetting() {
flowConnConfig.DoInitialSnapshot = true
for _, tm := range flowConnConfig.TableMappings {
tm.Columns = []*protos.ColumnSetting{
{SourceName: "key", NullableEnabled: true},
{SourceName: "ky", NullableEnabled: true},
{SourceName: "val", NullableEnabled: true},
{SourceName: "n", NullableEnabled: true},
{SourceName: "t", NullableEnabled: true},
Expand All @@ -273,14 +262,11 @@ func (s ClickHouseSuite) Test_NullableColumnSetting() {
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on initial", srcTableName, dstTableName, "id,\"key\",val,n,t")
e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on initial", srcTableName, dstTableName, "id,ky,val,n,t")

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s (key) VALUES ('cdc');
`, srcFullName))
require.NoError(s.t, err)
require.NoError(s.t, s.source.Exec(fmt.Sprintf(`INSERT INTO %s (ky) VALUES ('cdc')`, srcFullName)))

e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id,\"key\",val,n,t")
e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id,ky,val,n,t")

env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
Expand Down

0 comments on commit 319ebd3

Please sign in to comment.