Skip to content

Commit d365a8b

Browse files
authored
QRecordSchema: close schemaLatch in Close (#2398)
goroutines were leaking waiting on schemaLatch, if provider hit error then schemaLatch could be left indefinitely open
1 parent ff1f11d commit d365a8b

File tree

15 files changed

+122
-72
lines changed

15 files changed

+122
-72
lines changed

flow/connectors/bigquery/qrep.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ func (c *BigQueryConnector) SyncQRepRecords(
2323
) (int, error) {
2424
// Ensure the destination table is available.
2525
destTable := config.DestinationTableIdentifier
26-
srcSchema := stream.Schema()
26+
srcSchema, err := stream.Schema()
27+
if err != nil {
28+
return 0, err
29+
}
2730

2831
tblMetadata, err := c.replayTableSchemaDeltasQRep(ctx, config, partition, srcSchema)
2932
if err != nil {
@@ -80,8 +83,9 @@ func (c *BigQueryConnector) replayTableSchemaDeltasQRep(
8083
}
8184
}
8285

83-
err = c.ReplayTableSchemaDeltas(ctx, config.Env, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta})
84-
if err != nil {
86+
if err := c.ReplayTableSchemaDeltas(
87+
ctx, config.Env, config.FlowJobName, []*protos.TableSchemaDelta{tableSchemaDelta},
88+
); err != nil {
8589
return nil, fmt.Errorf("failed to add columns to destination table: %w", err)
8690
}
8791
dstTableMetadata, err = bqTable.Metadata(ctx)

flow/connectors/clickhouse/qrep_avro_sync.go

+10-2
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,10 @@ func (s *ClickHouseAvroSyncMethod) SyncRecords(
6767
) (int, error) {
6868
dstTableName := s.config.DestinationTableIdentifier
6969

70-
schema := stream.Schema()
70+
schema, err := stream.Schema()
71+
if err != nil {
72+
return 0, err
73+
}
7174
s.logger.Info("sync function called and schema acquired",
7275
slog.String("dstTable", dstTableName))
7376

@@ -106,7 +109,12 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords(
106109
stagingPath := s.credsProvider.BucketPath
107110
startTime := time.Now()
108111

109-
avroSchema, err := s.getAvroSchema(ctx, config.Env, dstTableName, stream.Schema())
112+
schema, err := stream.Schema()
113+
if err != nil {
114+
return 0, err
115+
}
116+
117+
avroSchema, err := s.getAvroSchema(ctx, config.Env, dstTableName, schema)
110118
if err != nil {
111119
return 0, err
112120
}

flow/connectors/elasticsearch/qrep.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ func (esc *ElasticsearchConnector) SyncQRepRecords(ctx context.Context, config *
4242
) (int, error) {
4343
startTime := time.Now()
4444

45-
schema := stream.Schema()
45+
schema, err := stream.Schema()
46+
if err != nil {
47+
return 0, err
48+
}
4649

4750
var bulkIndexFatalError error
4851
var bulkIndexErrors []error

flow/connectors/kafka/qrep.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ func (c *KafkaConnector) SyncQRepRecords(
2727
) (int, error) {
2828
startTime := time.Now()
2929
numRecords := atomic.Int64{}
30-
schema := stream.Schema()
30+
schema, err := stream.Schema()
31+
if err != nil {
32+
return 0, err
33+
}
3134

3235
queueCtx, queueErr := context.WithCancelCause(ctx)
3336
pool, err := c.createPool(queueCtx, config.Env, config.Script, config.FlowJobName, nil, queueErr)

flow/connectors/postgres/qrep.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type QRepPullSink interface {
3232
}
3333

3434
type QRepSyncSink interface {
35-
GetColumnNames() []string
35+
GetColumnNames() ([]string, error)
3636
CopyInto(context.Context, *PostgresConnector, pgx.Tx, pgx.Identifier) (int64, error)
3737
}
3838

@@ -550,7 +550,10 @@ func syncQRepRecords(
550550
upsertMatchCols[col] = struct{}{}
551551
}
552552

553-
columnNames := sink.GetColumnNames()
553+
columnNames, err := sink.GetColumnNames()
554+
if err != nil {
555+
return -1, fmt.Errorf("faild to get column names: %w", err)
556+
}
554557
setClauseArray := make([]string, 0, len(upsertMatchColsList)+1)
555558
selectStrArray := make([]string, 0, len(columnNames))
556559
for _, col := range columnNames {
@@ -578,8 +581,7 @@ func syncQRepRecords(
578581
setClause,
579582
)
580583
c.logger.Info("Performing upsert operation", slog.String("upsertStmt", upsertStmt), syncLog)
581-
_, err := tx.Exec(ctx, upsertStmt)
582-
if err != nil {
584+
if _, err := tx.Exec(ctx, upsertStmt); err != nil {
583585
return -1, fmt.Errorf("failed to perform upsert operation: %w", err)
584586
}
585587
}

flow/connectors/postgres/qrep_query_executor.go

+30-18
Original file line numberDiff line numberDiff line change
@@ -161,9 +161,7 @@ func (qe *QRepQueryExecutor) processRowsStream(
161161
record, err := qe.mapRowToQRecord(rows, fieldDescriptions)
162162
if err != nil {
163163
qe.logger.Error("[pg_query_executor] failed to map row to QRecord", slog.Any("error", err))
164-
err := fmt.Errorf("failed to map row to QRecord: %w", err)
165-
stream.Close(err)
166-
return 0, err
164+
return 0, fmt.Errorf("failed to map row to QRecord: %w", err)
167165
}
168166

169167
stream.Records <- record
@@ -189,12 +187,10 @@ func (qe *QRepQueryExecutor) processFetchedRows(
189187
) (int, error) {
190188
rows, err := qe.executeQueryInTx(ctx, tx, cursorName, fetchSize)
191189
if err != nil {
192-
stream.Close(err)
193190
qe.logger.Error("[pg_query_executor] failed to execute query in tx",
194191
slog.Any("error", err), slog.String("query", query))
195192
return 0, fmt.Errorf("[pg_query_executor] failed to execute query in tx: %w", err)
196193
}
197-
198194
defer rows.Close()
199195

200196
fieldDescriptions := rows.FieldDescriptions()
@@ -210,7 +206,6 @@ func (qe *QRepQueryExecutor) processFetchedRows(
210206
}
211207

212208
if err := rows.Err(); err != nil {
213-
stream.Close(err)
214209
qe.logger.Error("[pg_query_executor] row iteration failed",
215210
slog.String("query", query), slog.Any("error", err))
216211
return 0, fmt.Errorf("[pg_query_executor] row iteration failed '%s': %w", query, err)
@@ -225,7 +220,8 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
225220
args ...interface{},
226221
) (*model.QRecordBatch, error) {
227222
stream := model.NewQRecordStream(1024)
228-
errors := make(chan error, 1)
223+
errors := make(chan struct{})
224+
var errorsError error
229225
qe.logger.Info("Executing and processing query", slog.String("query", query))
230226

231227
// must wait on errors to close before returning to maintain qe.conn exclusion
@@ -234,23 +230,28 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
234230
_, err := qe.ExecuteAndProcessQueryStream(ctx, stream, query, args...)
235231
if err != nil {
236232
qe.logger.Error("[pg_query_executor] failed to execute and process query stream", slog.Any("error", err))
237-
errors <- err
233+
errorsError = err
238234
}
239235
}()
240236

241237
select {
242-
case err := <-errors:
243-
return nil, err
238+
case <-errors:
239+
return nil, errorsError
244240
case <-stream.SchemaChan():
241+
schema, err := stream.Schema()
242+
if err != nil {
243+
return nil, err
244+
}
245245
batch := &model.QRecordBatch{
246-
Schema: stream.Schema(),
246+
Schema: schema,
247247
Records: nil,
248248
}
249249
for record := range stream.Records {
250250
batch.Records = append(batch.Records, record)
251251
}
252-
if err := <-errors; err != nil {
253-
return nil, err
252+
<-errors
253+
if errorsError != nil {
254+
return nil, errorsError
254255
}
255256
if err := stream.Err(); err != nil {
256257
return nil, fmt.Errorf("[pg] failed to get record from stream: %w", err)
@@ -288,10 +289,16 @@ func (qe *QRepQueryExecutor) ExecuteQueryIntoSink(
288289
})
289290
if err != nil {
290291
qe.logger.Error("[pg_query_executor] failed to begin transaction", slog.Any("error", err))
291-
return 0, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
292+
err := fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
293+
sink.Close(err)
294+
return 0, err
292295
}
293296

294-
return sink.ExecuteQueryWithTx(ctx, qe, tx, query, args...)
297+
totalRecords, err := sink.ExecuteQueryWithTx(ctx, qe, tx, query, args...)
298+
if err != nil {
299+
sink.Close(err)
300+
}
301+
return totalRecords, err
295302
}
296303

297304
func (qe *QRepQueryExecutor) ExecuteQueryIntoSinkGettingCurrentSnapshotXmin(
@@ -310,16 +317,21 @@ func (qe *QRepQueryExecutor) ExecuteQueryIntoSinkGettingCurrentSnapshotXmin(
310317
})
311318
if err != nil {
312319
qe.logger.Error("[pg_query_executor] failed to begin transaction", slog.Any("error", err))
313-
return 0, currentSnapshotXmin.Int64, fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
320+
err := fmt.Errorf("[pg_query_executor] failed to begin transaction: %w", err)
321+
sink.Close(err)
322+
return 0, currentSnapshotXmin.Int64, err
314323
}
315324

316-
err = tx.QueryRow(ctx, "select txid_snapshot_xmin(txid_current_snapshot())").Scan(&currentSnapshotXmin)
317-
if err != nil {
325+
if err := tx.QueryRow(ctx, "select txid_snapshot_xmin(txid_current_snapshot())").Scan(&currentSnapshotXmin); err != nil {
318326
qe.logger.Error("[pg_query_executor] failed to get current snapshot xmin", slog.Any("error", err))
327+
sink.Close(err)
319328
return 0, currentSnapshotXmin.Int64, err
320329
}
321330

322331
totalRecordsFetched, err := sink.ExecuteQueryWithTx(ctx, qe, tx, query, args...)
332+
if err != nil {
333+
sink.Close(err)
334+
}
323335
return totalRecordsFetched, currentSnapshotXmin.Int64, err
324336
}
325337

flow/connectors/postgres/sink_pg.go

+13-14
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
type PgCopyShared struct {
1717
schemaLatch chan struct{}
18+
err error
1819
schema []string
1920
schemaSet bool
2021
}
@@ -54,13 +55,10 @@ func (p PgCopyWriter) ExecuteQueryWithTx(
5455
defer shared.RollbackTx(tx, qe.logger)
5556

5657
if qe.snapshot != "" {
57-
_, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot))
58-
if err != nil {
58+
if _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)); err != nil {
5959
qe.logger.Error("[pg_query_executor] failed to set snapshot",
6060
slog.Any("error", err), slog.String("query", query))
61-
err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)
62-
p.Close(err)
63-
return 0, err
61+
return 0, fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)
6462
}
6563
}
6664

@@ -88,17 +86,13 @@ func (p PgCopyWriter) ExecuteQueryWithTx(
8886
if err != nil {
8987
qe.logger.Info("[pg_query_executor] failed to copy",
9088
slog.String("copyQuery", copyQuery), slog.Any("error", err))
91-
err = fmt.Errorf("[pg_query_executor] failed to copy: %w", err)
92-
p.Close(err)
93-
return 0, err
89+
return 0, fmt.Errorf("[pg_query_executor] failed to copy: %w", err)
9490
}
9591

9692
qe.logger.Info("Committing transaction")
9793
if err := tx.Commit(ctx); err != nil {
9894
qe.logger.Error("[pg_query_executor] failed to commit transaction", slog.Any("error", err))
99-
err = fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err)
100-
p.Close(err)
101-
return 0, err
95+
return 0, fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err)
10296
}
10397

10498
totalRecordsFetched := ct.RowsAffected()
@@ -109,15 +103,20 @@ func (p PgCopyWriter) ExecuteQueryWithTx(
109103

110104
func (p PgCopyWriter) Close(err error) {
111105
p.PipeWriter.CloseWithError(err)
106+
p.schema.err = err
107+
p.SetSchema(nil)
112108
}
113109

114-
func (p PgCopyReader) GetColumnNames() []string {
110+
func (p PgCopyReader) GetColumnNames() ([]string, error) {
115111
<-p.schema.schemaLatch
116-
return p.schema.schema
112+
return p.schema.schema, p.schema.err
117113
}
118114

119115
func (p PgCopyReader) CopyInto(ctx context.Context, c *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) {
120-
cols := p.GetColumnNames()
116+
cols, err := p.GetColumnNames()
117+
if err != nil {
118+
return 0, err
119+
}
121120
quotedCols := make([]string, 0, len(cols))
122121
for _, col := range cols {
123122
quotedCols = append(quotedCols, QuoteIdentifier(col))

flow/connectors/postgres/sink_q.go

+15-14
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,10 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
2626
defer shared.RollbackTx(tx, qe.logger)
2727

2828
if qe.snapshot != "" {
29-
_, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot))
30-
if err != nil {
29+
if _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)); err != nil {
3130
qe.logger.Error("[pg_query_executor] failed to set snapshot",
3231
slog.Any("error", err), slog.String("query", query))
33-
err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)
34-
stream.Close(err)
35-
return 0, err
32+
return 0, fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)
3633
}
3734
}
3835

@@ -47,9 +44,7 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
4744
if _, err := tx.Exec(ctx, cursorQuery, args...); err != nil {
4845
qe.logger.Info("[pg_query_executor] failed to declare cursor",
4946
slog.String("cursorQuery", cursorQuery), slog.Any("error", err))
50-
err = fmt.Errorf("[pg_query_executor] failed to declare cursor: %w", err)
51-
stream.Close(err)
52-
return 0, err
47+
return 0, fmt.Errorf("[pg_query_executor] failed to declare cursor: %w", err)
5348
}
5449

5550
qe.logger.Info(fmt.Sprintf("[pg_query_executor] declared cursor '%s' for query '%s'", cursorName, query))
@@ -73,9 +68,7 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
7368
qe.logger.Info("Committing transaction")
7469
if err := tx.Commit(ctx); err != nil {
7570
qe.logger.Error("[pg_query_executor] failed to commit transaction", slog.Any("error", err))
76-
err = fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err)
77-
stream.Close(err)
78-
return 0, err
71+
return 0, fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err)
7972
}
8073

8174
qe.logger.Info(fmt.Sprintf("[pg_query_executor] committed transaction for query '%s', rows = %d",
@@ -84,9 +77,17 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
8477
}
8578

8679
func (stream RecordStreamSink) CopyInto(ctx context.Context, _ *PostgresConnector, tx pgx.Tx, table pgx.Identifier) (int64, error) {
87-
return tx.CopyFrom(ctx, table, stream.GetColumnNames(), model.NewQRecordCopyFromSource(stream.QRecordStream))
80+
columnNames, err := stream.GetColumnNames()
81+
if err != nil {
82+
return 0, err
83+
}
84+
return tx.CopyFrom(ctx, table, columnNames, model.NewQRecordCopyFromSource(stream.QRecordStream))
8885
}
8986

90-
func (stream RecordStreamSink) GetColumnNames() []string {
91-
return stream.Schema().GetColumnNames()
87+
func (stream RecordStreamSink) GetColumnNames() ([]string, error) {
88+
schema, err := stream.Schema()
89+
if err != nil {
90+
return nil, err
91+
}
92+
return schema.GetColumnNames(), nil
9293
}

flow/connectors/pubsub/qrep.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ func (c *PubSubConnector) SyncQRepRecords(
2525
stream *model.QRecordStream,
2626
) (int, error) {
2727
startTime := time.Now()
28-
numRecords := atomic.Int64{}
29-
schema := stream.Schema()
28+
schema, err := stream.Schema()
29+
if err != nil {
30+
return 0, err
31+
}
3032
topiccache := topicCache{cache: make(map[string]*pubsub.Topic)}
3133
publish := make(chan publishResult, 32)
3234
waitChan := make(chan struct{})
35+
numRecords := atomic.Int64{}
3336

3437
queueCtx, queueErr := context.WithCancelCause(ctx)
3538
pool, err := c.createPool(queueCtx, config.Env, config.Script, config.FlowJobName, &topiccache, publish, queueErr)

flow/connectors/s3/qrep.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ func (c *S3Connector) SyncQRepRecords(
1717
partition *protos.QRepPartition,
1818
stream *model.QRecordStream,
1919
) (int, error) {
20-
schema := stream.Schema()
20+
schema, err := stream.Schema()
21+
if err != nil {
22+
return 0, err
23+
}
2124

2225
dstTableName := config.DestinationTableIdentifier
2326
avroSchema, err := getAvroSchema(ctx, config.Env, dstTableName, schema)

0 commit comments

Comments
 (0)