Skip to content

Commit d2ac16f

Browse files
committed
e2e: SuiteSource
split out from mysql PR, where this allows tests to run against both PG & MySQL
1 parent 67ba3d6 commit d2ac16f

21 files changed

+504
-424
lines changed

flow/e2e/bigquery/bigquery.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector {
3636
return s.conn
3737
}
3838

39+
func (s PeerFlowE2ETestSuiteBQ) Source() e2e.SuiteSource {
40+
return &e2e.PostgresSource{PostgresConnector: s.conn}
41+
}
42+
3943
func (s PeerFlowE2ETestSuiteBQ) DestinationConnector() connectors.Connector {
4044
// TODO have BQ connector
4145
return nil
@@ -110,7 +114,7 @@ func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ {
110114
return PeerFlowE2ETestSuiteBQ{
111115
t: t,
112116
bqSuffix: bqSuffix,
113-
conn: conn,
117+
conn: conn.PostgresConnector,
114118
bqHelper: bqHelper,
115119
}
116120
}

flow/e2e/bigquery/peer_flow_bq_test.go

+21-21
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() {
119119
Destination: s.Peer().Name,
120120
}
121121

122-
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
122+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
123123
flowConnConfig.MaxBatchSize = 1
124124

125125
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
@@ -150,7 +150,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() {
150150
Destination: s.Peer().Name,
151151
}
152152

153-
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
153+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
154154
flowConnConfig.MaxBatchSize = 1
155155

156156
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
@@ -182,7 +182,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
182182
Destination: s.Peer().Name,
183183
}
184184

185-
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
185+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
186186
flowConnConfig.MaxBatchSize = 100
187187

188188
// wait for PeerFlowStatusQuery to finish setup
@@ -233,7 +233,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
233233
Destination: s.Peer().Name,
234234
}
235235

236-
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
236+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
237237
flowConnConfig.MaxBatchSize = 100
238238

239239
// wait for PeerFlowStatusQuery to finish setup
@@ -289,7 +289,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
289289
Destination: s.Peer().Name,
290290
}
291291

292-
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
292+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
293293
flowConnConfig.MaxBatchSize = 100
294294

295295
// wait for PeerFlowStatusQuery to finish setup
@@ -339,7 +339,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
339339
Destination: s.Peer().Name,
340340
}
341341

342-
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
342+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
343343
flowConnConfig.MaxBatchSize = 100
344344

345345
// wait for PeerFlowStatusQuery to finish setup
@@ -395,7 +395,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
395395
Destination: s.Peer().Name,
396396
}
397397

398-
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
398+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
399399
flowConnConfig.MaxBatchSize = 100
400400

401401
// wait for PeerFlowStatusQuery to finish setup
@@ -477,7 +477,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() {
477477
Destination: s.Peer().Name,
478478
}
479479

480-
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
480+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
481481
flowConnConfig.MaxBatchSize = 100
482482

483483
// wait for PeerFlowStatusQuery to finish setup
@@ -529,7 +529,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
529529
Destination: s.Peer().Name,
530530
}
531531

532-
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
532+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
533533
flowConnConfig.MaxBatchSize = 100
534534

535535
// wait for PeerFlowStatusQuery to finish setup
@@ -605,7 +605,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
605605
Destination: s.Peer().Name,
606606
}
607607

608-
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
608+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
609609
flowConnConfig.MaxBatchSize = 100
610610

611611
// wait for PeerFlowStatusQuery to finish setup
@@ -659,7 +659,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
659659
Destination: s.Peer().Name,
660660
}
661661

662-
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
662+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
663663
flowConnConfig.MaxBatchSize = 100
664664

665665
// wait for PeerFlowStatusQuery to finish setup
@@ -737,7 +737,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_All_Types_Schema_Changes_BQ() {
737737
Destination: s.Peer().Name,
738738
}
739739

740-
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
740+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
741741
flowConnConfig.MaxBatchSize = 100
742742

743743
// wait for PeerFlowStatusQuery to finish setup
@@ -805,7 +805,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
805805
Destination: s.Peer().Name,
806806
}
807807

808-
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
808+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
809809
flowConnConfig.MaxBatchSize = 100
810810

811811
// wait for PeerFlowStatusQuery to finish setup
@@ -861,7 +861,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
861861
Destination: s.Peer().Name,
862862
}
863863

864-
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
864+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
865865
flowConnConfig.MaxBatchSize = 100
866866
flowConnConfig.SoftDeleteColName = ""
867867
flowConnConfig.SyncedAtColName = ""
@@ -921,7 +921,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
921921
Destination: s.Peer().Name,
922922
}
923923

924-
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
924+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
925925
flowConnConfig.MaxBatchSize = 100
926926

927927
// wait for PeerFlowStatusQuery to finish setup
@@ -972,7 +972,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() {
972972
SoftDelete: true,
973973
}
974974

975-
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
975+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
976976
flowConnConfig.MaxBatchSize = 100
977977

978978
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
@@ -1022,7 +1022,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {
10221022
Destination: s.Peer().Name,
10231023
}
10241024

1025-
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
1025+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
10261026
flowConnConfig.MaxBatchSize = 100
10271027

10281028
// wait for PeerFlowStatusQuery to finish setup
@@ -1103,7 +1103,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() {
11031103
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE id=1`, srcTableName))
11041104
e2e.EnvNoError(s.t, env, err)
11051105
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize delete", func() bool {
1106-
pgRows, err := e2e.GetPgRows(s.conn, s.bqSuffix, srcName, "id,c1,c2,t")
1106+
pgRows, err := s.Source().GetRows(s.bqSuffix, srcName, "id,c1,c2,t")
11071107
if err != nil {
11081108
return false
11091109
}
@@ -1248,7 +1248,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {
12481248
e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background()))
12491249

12501250
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize transaction", func() bool {
1251-
pgRows, err := e2e.GetPgRows(s.conn, s.bqSuffix, srcName, "id,c1,c2,t")
1251+
pgRows, err := s.Source().GetRows(s.bqSuffix, srcName, "id,c1,c2,t")
12521252
e2e.EnvNoError(s.t, env, err)
12531253
rows, err := s.GetRowsWhere(dstName, "id,c1,c2,t", "NOT _PEERDB_IS_DELETED")
12541254
if err != nil {
@@ -1312,7 +1312,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {
13121312
"DELETE FROM %s WHERE id=1", srcTableName))
13131313
e2e.EnvNoError(s.t, env, err)
13141314
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize delete", func() bool {
1315-
pgRows, err := e2e.GetPgRows(s.conn, s.bqSuffix, tableName, "id,c1,c2,t")
1315+
pgRows, err := s.Source().GetRows(s.bqSuffix, tableName, "id,c1,c2,t")
13161316
if err != nil {
13171317
return false
13181318
}
@@ -1362,7 +1362,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_JSON_PKey_BQ() {
13621362
Destination: s.Peer().Name,
13631363
}
13641364

1365-
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
1365+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
13661366
flowConnConfig.MaxBatchSize = 100
13671367
flowConnConfig.SoftDeleteColName = ""
13681368
flowConnConfig.SyncedAtColName = ""

flow/e2e/clickhouse/clickhouse.go

+34-20
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525

2626
type ClickHouseSuite struct {
2727
t *testing.T
28-
conn *connpostgres.PostgresConnector
28+
source e2e.SuiteSource
2929
s3Helper *e2e_s3.S3TestHelper
3030
suffix string
3131
}
@@ -35,7 +35,15 @@ func (s ClickHouseSuite) T() *testing.T {
3535
}
3636

3737
func (s ClickHouseSuite) Connector() *connpostgres.PostgresConnector {
38-
return s.conn
38+
c, ok := s.source.Connector().(*connpostgres.PostgresConnector)
39+
if !ok {
40+
s.t.Skipf("skipping test because it relies on PostgresConnector, while source is %T", s.source)
41+
}
42+
return c
43+
}
44+
45+
func (s ClickHouseSuite) Source() e2e.SuiteSource {
46+
return s.source
3947
}
4048

4149
func (s ClickHouseSuite) DestinationConnector() connectors.Connector {
@@ -88,7 +96,7 @@ func (s ClickHouseSuite) DestinationTable(table string) string {
8896

8997
func (s ClickHouseSuite) Teardown() {
9098
require.NoError(s.t, s.s3Helper.CleanUp(context.Background()))
91-
e2e.TearDownPostgres(s)
99+
s.source.Teardown(s.t, s.Suffix())
92100
}
93101

94102
func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch, error) {
@@ -216,27 +224,33 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch
216224
return batch, rows.Err()
217225
}
218226

219-
func SetupSuite(t *testing.T) ClickHouseSuite {
227+
func SetupSuite[TSource e2e.SuiteSource](
228+
t *testing.T,
229+
setupSource func(*testing.T, string) (TSource, error),
230+
) func(*testing.T) ClickHouseSuite {
220231
t.Helper()
232+
return func(t *testing.T) ClickHouseSuite {
233+
t.Helper()
221234

222-
suffix := "ch_" + strings.ToLower(shared.RandomString(8))
223-
conn, err := e2e.SetupPostgres(t, suffix)
224-
require.NoError(t, err, "failed to setup postgres")
235+
suffix := "ch_" + strings.ToLower(shared.RandomString(8))
236+
source, err := setupSource(t, suffix)
237+
require.NoError(t, err, "failed to setup postgres")
225238

226-
s3Helper, err := e2e_s3.NewS3TestHelper(e2e_s3.Minio)
227-
require.NoError(t, err, "failed to setup S3")
239+
s3Helper, err := e2e_s3.NewS3TestHelper(e2e_s3.Minio)
240+
require.NoError(t, err, "failed to setup S3")
228241

229-
s := ClickHouseSuite{
230-
t: t,
231-
conn: conn,
232-
suffix: suffix,
233-
s3Helper: s3Helper,
234-
}
242+
s := ClickHouseSuite{
243+
t: t,
244+
source: e2e.SuiteSource(source),
245+
suffix: suffix,
246+
s3Helper: s3Helper,
247+
}
235248

236-
ch, err := connclickhouse.Connect(context.Background(), nil, s.PeerForDatabase("default").GetClickhouseConfig())
237-
require.NoError(t, err, "failed to connect to clickhouse")
238-
err = ch.Exec(context.Background(), "CREATE DATABASE e2e_test_"+suffix)
239-
require.NoError(t, err, "failed to create clickhouse database")
249+
ch, err := connclickhouse.Connect(context.Background(), nil, s.PeerForDatabase("default").GetClickhouseConfig())
250+
require.NoError(t, err, "failed to connect to clickhouse")
251+
err = ch.Exec(context.Background(), "CREATE DATABASE e2e_test_"+suffix)
252+
require.NoError(t, err, "failed to create clickhouse database")
240253

241-
return s
254+
return s
255+
}
242256
}

0 commit comments

Comments
 (0)