Skip to content

Commit

Permalink
Merge branch 'main' into customer-solar
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Nov 26, 2024
2 parents b36c6c8 + 04deaf0 commit 70f684a
Show file tree
Hide file tree
Showing 28 changed files with 559 additions and 367 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ jobs:
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@ea9e4e37992a54ee68a9622e985e60c8e8f12d9f # v3
uses: github/codeql-action/init@f09c1c0a94de965c15400f5634aa42fac8fb8f88 # v3
with:
languages: ${{ matrix.language }}
build-mode: ${{ matrix.build-mode }}

- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@ea9e4e37992a54ee68a9622e985e60c8e8f12d9f # v3
uses: github/codeql-action/analyze@f09c1c0a94de965c15400f5634aa42fac8fb8f88 # v3
with:
category: "/language:${{matrix.language}}"
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
POSTGRES_DB: postgres
POSTGRES_INITDB_ARGS: --locale=C.UTF-8
elasticsearch:
image: elasticsearch:8.16.0@sha256:a411f7c17549209c5839b69f929de00bd91f1e2dcf08b65d5f41b122eae17f5e
image: elasticsearch:8.16.1@sha256:e5ee5f8dacbf18fa3ab59a098cc7d4d69f73e61637eb45f1c029e74b1cb200a1
ports:
- 9200:9200
env:
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ PeerDB is an ETL/ELT tool built for PostgreSQL. We implement multiple Postgres n

**From a feature richness standpoint**, we support efficient syncing of tables with large (TOAST) columns. We support multiple streaming modes - Log based (CDC) based, Query based streaming etc. We provide rich data-type mapping and plan to support every possible (incl. Custom types) that Postgres supports to the best extent possible on the target data-store.

### Now available natively in ClickHouse Cloud (Private Preview)

PeerDB is now available natively in ClickHouse Cloud (Private Preview). Learn more about it [here](https://clickhouse.com/cloud/clickpipes/postgres-cdc-connector).

<a href="https://clickhouse.com/cloud/clickpipes/postgres-cdc-connector">
<img src="images/in-clickpipes.png" width="512" />
</a>

#### **Postgres-compatible SQL interface to do ETL**

The Postgres-compatible SQL interface for ETL is unique to PeerDB and enables you to operate in a language you are familiar with. You can do ETL the same way you work with your databases.
Expand Down
23 changes: 9 additions & 14 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/pua"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -759,11 +758,10 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
return
}

slotMetricGauges := peerdb_gauges.SlotMetricGauges{}
slotMetricGauges := otel_metrics.SlotMetricGauges{}
if a.OtelManager != nil {
slotLagGauge, err := otel_metrics.GetOrInitFloat64SyncGauge(a.OtelManager.Meter,
a.OtelManager.Float64GaugesCache,
peerdb_gauges.BuildGaugeName(peerdb_gauges.SlotLagGaugeName),
slotLagGauge, err := a.OtelManager.GetOrInitFloat64Gauge(
otel_metrics.BuildMetricName(otel_metrics.SlotLagGaugeName),
metric.WithUnit("MiBy"),
metric.WithDescription("Postgres replication slot lag in MB"))
if err != nil {
Expand All @@ -772,29 +770,26 @@ func (a *FlowableActivity) RecordSlotSizes(ctx context.Context) error {
}
slotMetricGauges.SlotLagGauge = slotLagGauge

openConnectionsGauge, err := otel_metrics.GetOrInitInt64SyncGauge(a.OtelManager.Meter,
a.OtelManager.Int64GaugesCache,
peerdb_gauges.BuildGaugeName(peerdb_gauges.OpenConnectionsGaugeName),
openConnectionsGauge, err := a.OtelManager.GetOrInitInt64Gauge(
otel_metrics.BuildMetricName(otel_metrics.OpenConnectionsGaugeName),
metric.WithDescription("Current open connections for PeerDB user"))
if err != nil {
logger.Error("Failed to get open connections gauge", slog.Any("error", err))
return
}
slotMetricGauges.OpenConnectionsGauge = openConnectionsGauge

openReplicationConnectionsGauge, err := otel_metrics.GetOrInitInt64SyncGauge(a.OtelManager.Meter,
a.OtelManager.Int64GaugesCache,
peerdb_gauges.BuildGaugeName(peerdb_gauges.OpenReplicationConnectionsGaugeName),
openReplicationConnectionsGauge, err := a.OtelManager.GetOrInitInt64Gauge(
otel_metrics.BuildMetricName(otel_metrics.OpenReplicationConnectionsGaugeName),
metric.WithDescription("Current open replication connections for PeerDB user"))
if err != nil {
logger.Error("Failed to get open replication connections gauge", slog.Any("error", err))
return
}
slotMetricGauges.OpenReplicationConnectionsGauge = openReplicationConnectionsGauge

intervalSinceLastNormalizeGauge, err := otel_metrics.GetOrInitFloat64SyncGauge(a.OtelManager.Meter,
a.OtelManager.Float64GaugesCache,
peerdb_gauges.BuildGaugeName(peerdb_gauges.IntervalSinceLastNormalizeGaugeName),
intervalSinceLastNormalizeGauge, err := a.OtelManager.GetOrInitFloat64Gauge(
otel_metrics.BuildMetricName(otel_metrics.IntervalSinceLastNormalizeGaugeName),
metric.WithUnit("s"),
metric.WithDescription("Interval since last normalize"))
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -113,7 +114,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
options *protos.SyncFlowOptions,
sessionID string,
adaptStream func(*model.CDCStream[Items]) (*model.CDCStream[Items], error),
pull func(TPull, context.Context, *pgxpool.Pool, *model.PullRecordsRequest[Items]) error,
pull func(TPull, context.Context, *pgxpool.Pool, *otel_metrics.OtelManager, *model.PullRecordsRequest[Items]) error,
sync func(TSync, context.Context, *model.SyncRecordsRequest[Items]) (*model.SyncResponse, error),
) (*model.SyncCompositeResponse, error) {
flowName := config.FlowJobName
Expand Down Expand Up @@ -181,7 +182,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
startTime := time.Now()
errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return pull(srcConn, errCtx, a.CatalogPool, &model.PullRecordsRequest[Items]{
return pull(srcConn, errCtx, a.CatalogPool, a.OtelManager, &model.PullRecordsRequest[Items]{
FlowJobName: flowName,
SrcTableIDNameMapping: options.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
Expand Down
20 changes: 18 additions & 2 deletions flow/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/shared/telemetry"
"github.com/PeerDB-io/peer-flow/tags"
)

// alerting service, no cool name :(
Expand Down Expand Up @@ -366,13 +367,24 @@ func (a *Alerter) sendTelemetryMessage(
flowName string,
more string,
level telemetry.Level,
tags ...string,
additionalTags ...string,
) {
allTags := []string{flowName, peerdbenv.PeerDBDeploymentUID()}
allTags = append(allTags, additionalTags...)

if flowTags, err := tags.GetTags(ctx, a.CatalogPool, flowName); err != nil {
logger.Warn("failed to get flow tags", slog.Any("error", err))
} else {
for key, value := range flowTags {
allTags = append(allTags, fmt.Sprintf("%s:%s", key, value))
}
}

details := fmt.Sprintf("[%s] %s", flowName, more)
attributes := telemetry.Attributes{
Level: level,
DeploymentUID: peerdbenv.PeerDBDeploymentUID(),
Tags: append([]string{flowName, peerdbenv.PeerDBDeploymentUID()}, tags...),
Tags: allTags,
Type: flowName,
}

Expand Down Expand Up @@ -440,6 +452,10 @@ func (a *Alerter) LogFlowError(ctx context.Context, flowName string, err error)
if errors.As(err, &pgErr) {
tags = append(tags, "pgcode:"+pgErr.Code)
}
var netErr *net.OpError
if errors.As(err, &netErr) {
tags = append(tags, "err:Net")
}
a.sendTelemetryMessage(ctx, logger, flowName, errorWithStack, telemetry.ERROR, tags...)
}

Expand Down
84 changes: 84 additions & 0 deletions flow/cmd/tags_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package cmd

import (
"context"
"fmt"
"log/slog"

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/tags"
)

func (h *FlowRequestHandler) flowExists(ctx context.Context, flowName string) (bool, error) {
var exists bool
err := h.pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM flows WHERE name = $1)", flowName).Scan(&exists)
if err != nil {
slog.Error("error checking if flow exists", slog.Any("error", err))
return false, err
}

slog.Info(fmt.Sprintf("flow %s exists: %t", flowName, exists))
return exists, nil
}

func (h *FlowRequestHandler) CreateOrReplaceFlowTags(
ctx context.Context,
in *protos.CreateOrReplaceFlowTagsRequest,
) (*protos.CreateOrReplaceFlowTagsResponse, error) {
flowName := in.FlowName

exists, err := h.flowExists(ctx, flowName)
if err != nil {
return nil, err
}

if !exists {
slog.Error("flow does not exist", slog.String("flow_name", flowName))
return nil, fmt.Errorf("flow %s does not exist", flowName)
}

tags := make(map[string]string, len(in.Tags))
for _, tag := range in.Tags {
tags[tag.Key] = tag.Value
}

_, err = h.pool.Exec(ctx, "UPDATE flows SET tags = $1 WHERE name = $2", tags, flowName)
if err != nil {
slog.Error("error updating flow tags", slog.Any("error", err))
return nil, err
}

return &protos.CreateOrReplaceFlowTagsResponse{
FlowName: flowName,
}, nil
}

func (h *FlowRequestHandler) GetFlowTags(ctx context.Context, in *protos.GetFlowTagsRequest) (*protos.GetFlowTagsResponse, error) {
flowName := in.FlowName

exists, err := h.flowExists(ctx, flowName)
if err != nil {
return nil, err
}

if !exists {
slog.Error("flow does not exist", slog.String("flow_name", flowName))
return nil, fmt.Errorf("flow %s does not exist", flowName)
}

tags, err := tags.GetTags(ctx, h.pool, flowName)
if err != nil {
slog.Error("error getting flow tags", slog.Any("error", err))
return nil, err
}

protosTags := make([]*protos.FlowTag, 0, len(tags))
for key, value := range tags {
protosTags = append(protosTags, &protos.FlowTag{Key: key, Value: value})
}

return &protos.GetFlowTagsResponse{
FlowName: flowName,
Tags: protosTags,
}, nil
}
32 changes: 16 additions & 16 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,18 @@ type WorkerSetupOptions struct {
}

type workerSetupResponse struct {
Client client.Client
Worker worker.Worker
Cleanup func()
Client client.Client
Worker worker.Worker
OtelManager *otel_metrics.OtelManager
}

func (w *workerSetupResponse) Close() {
w.Client.Close()
if w.OtelManager != nil {
if err := w.OtelManager.Close(context.Background()); err != nil {
slog.Error("Failed to shutdown metrics provider", slog.Any("error", err))
}
}
}

func setupPyroscope(opts *WorkerSetupOptions) {
Expand Down Expand Up @@ -148,7 +157,6 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {
})
peerflow.RegisterFlowWorkerWorkflows(w)

cleanupOtelManagerFunc := func() {}
var otelManager *otel_metrics.OtelManager
if opts.EnableOtelMetrics {
metricsProvider, metricsErr := otel_metrics.SetupPeerDBMetricsProvider("flow-worker")
Expand All @@ -160,12 +168,7 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {
Meter: metricsProvider.Meter("io.peerdb.flow-worker"),
Float64GaugesCache: make(map[string]metric.Float64Gauge),
Int64GaugesCache: make(map[string]metric.Int64Gauge),
}
cleanupOtelManagerFunc = func() {
shutDownErr := otelManager.MetricsProvider.Shutdown(context.Background())
if shutDownErr != nil {
slog.Error("Failed to shutdown metrics provider", slog.Any("error", shutDownErr))
}
Int64CountersCache: make(map[string]metric.Int64Counter),
}
}
w.RegisterActivity(&activities.FlowableActivity{
Expand All @@ -182,11 +185,8 @@ func WorkerSetup(opts *WorkerSetupOptions) (*workerSetupResponse, error) {
})

return &workerSetupResponse{
Client: c,
Worker: w,
Cleanup: func() {
cleanupOtelManagerFunc()
c.Close()
},
Client: c,
Worker: w,
OtelManager: otelManager,
}, nil
}
5 changes: 3 additions & 2 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,12 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou
tlsSetting.RootCAs = caPool
}

var settings clickhouse.Settings
// See: https://clickhouse.com/docs/en/cloud/reference/shared-merge-tree#consistency
settings := clickhouse.Settings{"select_sequential_consistency": uint64(1)}
if maxInsertThreads, err := peerdbenv.PeerDBClickHouseMaxInsertThreads(ctx, env); err != nil {
return nil, fmt.Errorf("failed to load max_insert_threads config: %w", err)
} else if maxInsertThreads != 0 {
settings = clickhouse.Settings{"max_insert_threads": maxInsertThreads}
settings["max_insert_threads"] = maxInsertThreads
}

conn, err := clickhouse.Open(&clickhouse.Options{
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ func (c *ClickHouseConnector) NormalizeRecords(
c.logger.Error("[clickhouse] context canceled while normalizing",
slog.Any("error", errCtx.Err()),
slog.Any("cause", context.Cause(errCtx)))
return nil, errCtx.Err()
return nil, context.Cause(errCtx)
}
}
close(queries)
Expand Down
18 changes: 14 additions & 4 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
connsqlserver "github.com/PeerDB-io/peer-flow/connectors/sqlserver"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/otel_metrics/peerdb_gauges"
"github.com/PeerDB-io/peer-flow/otel_metrics"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -85,7 +85,7 @@ type CDCPullConnectorCore interface {
alerter *alerting.Alerter,
catalogPool *pgxpool.Pool,
alertKeys *alerting.AlertKeys,
slotMetricGauges peerdb_gauges.SlotMetricGauges,
slotMetricGauges otel_metrics.SlotMetricGauges,
) error

// GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector.
Expand All @@ -102,15 +102,25 @@ type CDCPullConnector interface {
CDCPullConnectorCore

// This method should be idempotent, and should be able to be called multiple times with the same request.
PullRecords(ctx context.Context, catalogPool *pgxpool.Pool, req *model.PullRecordsRequest[model.RecordItems]) error
PullRecords(
ctx context.Context,
catalogPool *pgxpool.Pool,
otelManager *otel_metrics.OtelManager,
req *model.PullRecordsRequest[model.RecordItems],
) error
}

type CDCPullPgConnector interface {
CDCPullConnectorCore

// This method should be idempotent, and should be able to be called multiple times with the same request.
// It's signature, aside from type parameter, should match CDCPullConnector.PullRecords.
PullPg(ctx context.Context, catalogPool *pgxpool.Pool, req *model.PullRecordsRequest[model.PgItems]) error
PullPg(
ctx context.Context,
catalogPool *pgxpool.Pool,
otelManager *otel_metrics.OtelManager,
req *model.PullRecordsRequest[model.PgItems],
) error
}

type NormalizedTablesConnector interface {
Expand Down
Loading

0 comments on commit 70f684a

Please sign in to comment.