Skip to content

Commit

Permalink
e2e: SuiteSource (#2473)
Browse files Browse the repository at this point in the history
split out from #2395, where this allows tests to run against both PG & MySQL
  • Loading branch information
serprex authored Jan 21, 2025
1 parent 19bf030 commit 960aa0b
Show file tree
Hide file tree
Showing 21 changed files with 504 additions and 424 deletions.
6 changes: 5 additions & 1 deletion flow/e2e/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func (s PeerFlowE2ETestSuiteBQ) Connector() *connpostgres.PostgresConnector {
return s.conn
}

func (s PeerFlowE2ETestSuiteBQ) Source() e2e.SuiteSource {
return &e2e.PostgresSource{PostgresConnector: s.conn}
}

func (s PeerFlowE2ETestSuiteBQ) DestinationConnector() connectors.Connector {
// TODO have BQ connector
return nil
Expand Down Expand Up @@ -110,7 +114,7 @@ func SetupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ {
return PeerFlowE2ETestSuiteBQ{
t: t,
bqSuffix: bqSuffix,
conn: conn,
conn: conn.PostgresConnector,
bqHelper: bqHelper,
}
}
42 changes: 21 additions & 21 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() {
Destination: s.Peer().Name,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.MaxBatchSize = 1

env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
Expand Down Expand Up @@ -150,7 +150,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() {
Destination: s.Peer().Name,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.MaxBatchSize = 1

env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
Expand Down Expand Up @@ -182,7 +182,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
Destination: s.Peer().Name,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.MaxBatchSize = 100

// wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -233,7 +233,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
Destination: s.Peer().Name,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.MaxBatchSize = 100

// wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -289,7 +289,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
Destination: s.Peer().Name,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.MaxBatchSize = 100

// wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -339,7 +339,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
Destination: s.Peer().Name,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.MaxBatchSize = 100

// wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -395,7 +395,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
Destination: s.Peer().Name,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.MaxBatchSize = 100

// wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -477,7 +477,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() {
Destination: s.Peer().Name,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.MaxBatchSize = 100

// wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -529,7 +529,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
Destination: s.Peer().Name,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.MaxBatchSize = 100

// wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -605,7 +605,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
Destination: s.Peer().Name,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.MaxBatchSize = 100

// wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -659,7 +659,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
Destination: s.Peer().Name,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.MaxBatchSize = 100

// wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -737,7 +737,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_All_Types_Schema_Changes_BQ() {
Destination: s.Peer().Name,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.MaxBatchSize = 100

// wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -805,7 +805,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
Destination: s.Peer().Name,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.MaxBatchSize = 100

// wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -861,7 +861,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
Destination: s.Peer().Name,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.MaxBatchSize = 100
flowConnConfig.SoftDeleteColName = ""
flowConnConfig.SyncedAtColName = ""
Expand Down Expand Up @@ -921,7 +921,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
Destination: s.Peer().Name,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.MaxBatchSize = 100

// wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -972,7 +972,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() {
SoftDelete: true,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.MaxBatchSize = 100

env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
Expand Down Expand Up @@ -1022,7 +1022,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {
Destination: s.Peer().Name,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.MaxBatchSize = 100

// wait for PeerFlowStatusQuery to finish setup
Expand Down Expand Up @@ -1103,7 +1103,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() {
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE id=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize delete", func() bool {
pgRows, err := e2e.GetPgRows(s.conn, s.bqSuffix, srcName, "id,c1,c2,t")
pgRows, err := s.Source().GetRows(s.bqSuffix, srcName, "id,c1,c2,t")
if err != nil {
return false
}
Expand Down Expand Up @@ -1248,7 +1248,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {
e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background()))

e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize transaction", func() bool {
pgRows, err := e2e.GetPgRows(s.conn, s.bqSuffix, srcName, "id,c1,c2,t")
pgRows, err := s.Source().GetRows(s.bqSuffix, srcName, "id,c1,c2,t")
e2e.EnvNoError(s.t, env, err)
rows, err := s.GetRowsWhere(dstName, "id,c1,c2,t", "NOT _PEERDB_IS_DELETED")
if err != nil {
Expand Down Expand Up @@ -1312,7 +1312,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {
"DELETE FROM %s WHERE id=1", srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize delete", func() bool {
pgRows, err := e2e.GetPgRows(s.conn, s.bqSuffix, tableName, "id,c1,c2,t")
pgRows, err := s.Source().GetRows(s.bqSuffix, tableName, "id,c1,c2,t")
if err != nil {
return false
}
Expand Down Expand Up @@ -1362,7 +1362,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_JSON_PKey_BQ() {
Destination: s.Peer().Name,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.MaxBatchSize = 100
flowConnConfig.SoftDeleteColName = ""
flowConnConfig.SyncedAtColName = ""
Expand Down
54 changes: 34 additions & 20 deletions flow/e2e/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

type ClickHouseSuite struct {
t *testing.T
conn *connpostgres.PostgresConnector
source e2e.SuiteSource
s3Helper *e2e_s3.S3TestHelper
suffix string
}
Expand All @@ -35,7 +35,15 @@ func (s ClickHouseSuite) T() *testing.T {
}

func (s ClickHouseSuite) Connector() *connpostgres.PostgresConnector {
return s.conn
c, ok := s.source.Connector().(*connpostgres.PostgresConnector)
if !ok {
s.t.Skipf("skipping test because it relies on PostgresConnector, while source is %T", s.source)
}
return c
}

func (s ClickHouseSuite) Source() e2e.SuiteSource {
return s.source
}

func (s ClickHouseSuite) DestinationConnector() connectors.Connector {
Expand Down Expand Up @@ -88,7 +96,7 @@ func (s ClickHouseSuite) DestinationTable(table string) string {

func (s ClickHouseSuite) Teardown() {
require.NoError(s.t, s.s3Helper.CleanUp(context.Background()))
e2e.TearDownPostgres(s)
s.source.Teardown(s.t, s.Suffix())
}

func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch, error) {
Expand Down Expand Up @@ -216,27 +224,33 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch
return batch, rows.Err()
}

func SetupSuite(t *testing.T) ClickHouseSuite {
func SetupSuite[TSource e2e.SuiteSource](
t *testing.T,
setupSource func(*testing.T, string) (TSource, error),
) func(*testing.T) ClickHouseSuite {
t.Helper()
return func(t *testing.T) ClickHouseSuite {
t.Helper()

suffix := "ch_" + strings.ToLower(shared.RandomString(8))
conn, err := e2e.SetupPostgres(t, suffix)
require.NoError(t, err, "failed to setup postgres")
suffix := "ch_" + strings.ToLower(shared.RandomString(8))
source, err := setupSource(t, suffix)
require.NoError(t, err, "failed to setup postgres")

s3Helper, err := e2e_s3.NewS3TestHelper(e2e_s3.Minio)
require.NoError(t, err, "failed to setup S3")
s3Helper, err := e2e_s3.NewS3TestHelper(e2e_s3.Minio)
require.NoError(t, err, "failed to setup S3")

s := ClickHouseSuite{
t: t,
conn: conn,
suffix: suffix,
s3Helper: s3Helper,
}
s := ClickHouseSuite{
t: t,
source: e2e.SuiteSource(source),
suffix: suffix,
s3Helper: s3Helper,
}

ch, err := connclickhouse.Connect(context.Background(), nil, s.PeerForDatabase("default").GetClickhouseConfig())
require.NoError(t, err, "failed to connect to clickhouse")
err = ch.Exec(context.Background(), "CREATE DATABASE e2e_test_"+suffix)
require.NoError(t, err, "failed to create clickhouse database")
ch, err := connclickhouse.Connect(context.Background(), nil, s.PeerForDatabase("default").GetClickhouseConfig())
require.NoError(t, err, "failed to connect to clickhouse")
err = ch.Exec(context.Background(), "CREATE DATABASE e2e_test_"+suffix)
require.NoError(t, err, "failed to create clickhouse database")

return s
return s
}
}
Loading

0 comments on commit 960aa0b

Please sign in to comment.