Skip to content

Commit

Permalink
Merge pull request #1397 from openmeterio/refactor-pg-init
Browse files Browse the repository at this point in the history
refactor: postgres/ent driver initialization
  • Loading branch information
chrisgacsal authored Aug 21, 2024
2 parents 449e328 + cc79db5 commit e226228
Show file tree
Hide file tree
Showing 10 changed files with 319 additions and 154 deletions.
68 changes: 36 additions & 32 deletions cmd/balance-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"syscall"
"time"

"entgo.io/ent/dialect/sql"
health "github.com/AppsFlyer/go-sundheit"
healthhttp "github.com/AppsFlyer/go-sundheit/http"
"github.com/ClickHouse/clickhouse-go/v2"
Expand All @@ -30,7 +29,6 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"

"github.com/openmeterio/openmeter/config"
"github.com/openmeterio/openmeter/internal/ent/db"
entitlementpgadapter "github.com/openmeterio/openmeter/internal/entitlement/adapter"
"github.com/openmeterio/openmeter/internal/entitlement/balanceworker"
"github.com/openmeterio/openmeter/internal/meter"
Expand All @@ -42,8 +40,9 @@ import (
"github.com/openmeterio/openmeter/internal/watermill/eventbus"
"github.com/openmeterio/openmeter/internal/watermill/router"
"github.com/openmeterio/openmeter/pkg/contextx"
"github.com/openmeterio/openmeter/pkg/framework/entutils"
entdriver "github.com/openmeterio/openmeter/pkg/framework/entutils/driver"
"github.com/openmeterio/openmeter/pkg/framework/operation"
"github.com/openmeterio/openmeter/pkg/framework/pgdriver"
"github.com/openmeterio/openmeter/pkg/gosundheit"
"github.com/openmeterio/openmeter/pkg/models"
"github.com/openmeterio/openmeter/pkg/slicesx"
Expand Down Expand Up @@ -211,14 +210,42 @@ func main() {
}

// Dependencies: postgresql
pgClients, err := initPGClients(conf.Postgres)

// Initialize Postgres driver
postgresDriver, err := pgdriver.NewPostgresDriver(
ctx,
conf.Postgres.URL,
pgdriver.WithTracerProvider(otelTracerProvider),
pgdriver.WithMeterProvider(otelMeterProvider),
)
if err != nil {
logger.Error("failed to initialize postgres clients", "error", err)
logger.Error("failed to initialize postgres driver", "error", err)
os.Exit(1)
}
defer pgClients.driver.Close()

logger.Info("Postgres clients initialized")
defer func() {
if err = postgresDriver.Close(); err != nil {
logger.Error("failed to close postgres driver", "error", err)
}
}()

// Initialize Ent driver
entPostgresDriver := entdriver.NewEntPostgresDriver(postgresDriver.DB())
defer func() {
if err = entPostgresDriver.Close(); err != nil {
logger.Error("failed to close ent driver", "error", err)
}
}()

entClient := entPostgresDriver.Client()

// Run database schema creation
err = entClient.Schema.Create(ctx)
if err != nil {
logger.Error("failed to create database schema", "error", err)
}

logger.Info("Postgres client initialized")

// Create subscriber
wmBrokerConfig := wmBrokerConfiguration(conf, logger, metricMeter)
Expand Down Expand Up @@ -259,7 +286,7 @@ func main() {

// Dependencies: entitlement
entitlementConnectors := registrybuilder.GetEntitlementRegistry(registry.EntitlementOptions{
DatabaseClient: pgClients.client,
DatabaseClient: entClient,
StreamingConnector: clickhouseStreamingConnector,
MeterRepository: meterRepository,
Logger: logger,
Expand All @@ -282,7 +309,7 @@ func main() {
EventBus: eventPublisher,

Entitlement: entitlementConnectors,
Repo: entitlementpgadapter.NewPostgresEntitlementRepo(pgClients.client),
Repo: entitlementpgadapter.NewPostgresEntitlementRepo(entClient),

Logger: logger,
}
Expand Down Expand Up @@ -350,26 +377,3 @@ func initEventPublisherDriver(ctx context.Context, broker watermillkafka.BrokerO
ProvisionTopics: provisionTopics,
})
}

type pgClients struct {
driver *sql.Driver
client *db.Client
}

func initPGClients(config config.PostgresConfig) (
*pgClients,
error,
) {
if err := config.Validate(); err != nil {
return nil, fmt.Errorf("invalid postgres config: %w", err)
}
driver, err := entutils.GetPGDriver(config.URL)
if err != nil {
return nil, fmt.Errorf("failed to init postgres driver: %w", err)
}

return &pgClients{
driver: driver,
client: db.NewClient(db.Driver(driver)),
}, nil
}
28 changes: 19 additions & 9 deletions cmd/jobs/entitlement/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
"go.opentelemetry.io/otel/metric"

"github.com/openmeterio/openmeter/config"
"github.com/openmeterio/openmeter/internal/ent/db"
"github.com/openmeterio/openmeter/internal/meter"
"github.com/openmeterio/openmeter/internal/registry"
registrybuilder "github.com/openmeterio/openmeter/internal/registry/builder"
"github.com/openmeterio/openmeter/internal/streaming/clickhouse_connector"
watermillkafka "github.com/openmeterio/openmeter/internal/watermill/driver/kafka"
"github.com/openmeterio/openmeter/internal/watermill/eventbus"
"github.com/openmeterio/openmeter/pkg/framework/entutils"
entdriver "github.com/openmeterio/openmeter/pkg/framework/entutils/driver"
"github.com/openmeterio/openmeter/pkg/framework/pgdriver"
"github.com/openmeterio/openmeter/pkg/models"
"github.com/openmeterio/openmeter/pkg/slicesx"
)
Expand All @@ -28,13 +28,16 @@ type entitlementConnectors struct {
}

func initEntitlements(ctx context.Context, conf config.Configuration, logger *slog.Logger, metricMeter metric.Meter, otelName string) (*entitlementConnectors, error) {
// Postgresql
entDriver, err := entutils.GetPGDriver(conf.Postgres.URL)
// Initialize Postgres driver
postgresDriver, err := pgdriver.NewPostgresDriver(ctx, conf.Postgres.URL)
if err != nil {
return nil, fmt.Errorf("failed to init postgres driver: %w", err)
return nil, fmt.Errorf("error initializing postgres driver: %w", err)
}

dbClient := db.NewClient(db.Driver(entDriver))
// Initialize Ent driver
entPostgresDriver := entdriver.NewEntPostgresDriver(postgresDriver.DB())

logger.Info("Postgres client initialized")

// Meter repository
meterRepository := meter.NewInMemoryRepository(slicesx.Map(conf.Meters, func(meter *models.Meter) models.Meter {
Expand Down Expand Up @@ -84,7 +87,7 @@ func initEntitlements(ctx context.Context, conf config.Configuration, logger *sl
}

entitlementRegistry := registrybuilder.GetEntitlementRegistry(registry.EntitlementOptions{
DatabaseClient: dbClient,
DatabaseClient: entPostgresDriver.Client(),
StreamingConnector: streamingConnector,
MeterRepository: meterRepository,
Logger: logger,
Expand All @@ -95,8 +98,15 @@ func initEntitlements(ctx context.Context, conf config.Configuration, logger *sl
Registry: entitlementRegistry,
EventBus: eventPublisher,
Shutdown: func() {
if err := dbClient.Close(); err != nil {
logger.Error("failed to close entitlement db client", "error", err)
if err := entPostgresDriver.Close(); err != nil {
logger.Error("failed to close ent driver", "error", err)
}

if postgresDriver != nil {
err := postgresDriver.Close()
if err != nil {
logger.Error("failed to close postgres driver", "error", err)
}
}

if err := clickHouseClient.Close(); err != nil {
Expand Down
66 changes: 35 additions & 31 deletions cmd/notification-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"syscall"
"time"

"entgo.io/ent/dialect/sql"
health "github.com/AppsFlyer/go-sundheit"
healthhttp "github.com/AppsFlyer/go-sundheit/http"
"github.com/ClickHouse/clickhouse-go/v2"
Expand All @@ -30,7 +29,6 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"

"github.com/openmeterio/openmeter/config"
"github.com/openmeterio/openmeter/internal/ent/db"
"github.com/openmeterio/openmeter/internal/meter"
"github.com/openmeterio/openmeter/internal/notification/consumer"
"github.com/openmeterio/openmeter/internal/registry"
Expand All @@ -41,8 +39,9 @@ import (
"github.com/openmeterio/openmeter/internal/watermill/eventbus"
"github.com/openmeterio/openmeter/internal/watermill/router"
"github.com/openmeterio/openmeter/pkg/contextx"
"github.com/openmeterio/openmeter/pkg/framework/entutils"
entdriver "github.com/openmeterio/openmeter/pkg/framework/entutils/driver"
"github.com/openmeterio/openmeter/pkg/framework/operation"
"github.com/openmeterio/openmeter/pkg/framework/pgdriver"
"github.com/openmeterio/openmeter/pkg/gosundheit"
"github.com/openmeterio/openmeter/pkg/models"
"github.com/openmeterio/openmeter/pkg/slicesx"
Expand Down Expand Up @@ -210,14 +209,42 @@ func main() {
}

// Dependencies: postgresql
pgClients, err := initPGClients(conf.Postgres)

// Initialize Postgres driver
postgresDriver, err := pgdriver.NewPostgresDriver(
ctx,
conf.Postgres.URL,
pgdriver.WithTracerProvider(otelTracerProvider),
pgdriver.WithMeterProvider(otelMeterProvider),
)
if err != nil {
logger.Error("failed to initialize postgres clients", "error", err)
logger.Error("failed to initialize postgres driver", "error", err)
os.Exit(1)
}
defer pgClients.driver.Close()

logger.Info("Postgres clients initialized")
defer func() {
if err = postgresDriver.Close(); err != nil {
logger.Error("failed to close postgres driver", "error", err)
}
}()

// Initialize Ent driver
entPostgresDriver := entdriver.NewEntPostgresDriver(postgresDriver.DB())
defer func() {
if err = entPostgresDriver.Close(); err != nil {
logger.Error("failed to close ent driver", "error", err)
}
}()

entClient := entPostgresDriver.Client()

// Run database schema creation
err = entClient.Schema.Create(ctx)
if err != nil {
logger.Error("failed to create database schema", "error", err)
}

logger.Info("Postgres client initialized")

// Create subscriber
wmSubscriber, err := watermillkafka.NewSubscriber(watermillkafka.SubscriberOptions{
Expand Down Expand Up @@ -256,7 +283,7 @@ func main() {

// Dependencies: entitlement
entitlementConnectors := registrybuilder.GetEntitlementRegistry(registry.EntitlementOptions{
DatabaseClient: pgClients.client,
DatabaseClient: entClient,
StreamingConnector: clickhouseStreamingConnector,
MeterRepository: meterRepository,
Logger: logger,
Expand Down Expand Up @@ -343,26 +370,3 @@ func initEventPublisherDriver(ctx context.Context, logger *slog.Logger, conf con
ProvisionTopics: provisionTopics,
})
}

type pgClients struct {
driver *sql.Driver
client *db.Client
}

func initPGClients(config config.PostgresConfig) (
*pgClients,
error,
) {
if err := config.Validate(); err != nil {
return nil, fmt.Errorf("invalid postgres config: %w", err)
}
driver, err := entutils.GetPGDriver(config.URL)
if err != nil {
return nil, fmt.Errorf("failed to init postgres driver: %w", err)
}

return &pgClients{
driver: driver,
client: db.NewClient(db.Driver(driver)),
}, nil
}
Loading

0 comments on commit e226228

Please sign in to comment.