Skip to content

Commit

Permalink
ingest: service created (#14)
Browse files Browse the repository at this point in the history
What
Implements the ingestion service in the wallet backend.

Why
New feature.
  • Loading branch information
daniel-burghardt authored May 29, 2024
1 parent 2b5e625 commit a8ec80a
Show file tree
Hide file tree
Showing 25 changed files with 1,593 additions and 45 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Visual Studio Code
.vscode
.vscode
captive-core*/
112 changes: 112 additions & 0 deletions cmd/ingest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package cmd

import (
"go/types"

_ "github.com/lib/pq"
"github.com/spf13/cobra"
"github.com/stellar/go/network"
"github.com/stellar/go/support/config"
"github.com/stellar/go/support/log"
"github.com/stellar/wallet-backend/cmd/utils"
"github.com/stellar/wallet-backend/internal/ingest"
)

type ingestCmd struct{}

func (c *ingestCmd) Command() *cobra.Command {
cfg := ingest.Configs{}
cfgOpts := config.ConfigOptions{
{
Name: "database-url",
Usage: "Database connection URL.",
OptType: types.String,
ConfigKey: &cfg.DatabaseURL,
FlagDefault: "postgres://postgres@localhost:5432/wallet-backend?sslmode=disable",
Required: true,
},
{
Name: "network-passphrase",
Usage: "Stellar Network Passphrase to connect.",
OptType: types.String,
ConfigKey: &cfg.NetworkPassphrase,
FlagDefault: network.TestNetworkPassphrase,
Required: true,
},
{
Name: "log-level",
Usage: `The log level used in this project. Options: "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL", or "PANIC".`,
OptType: types.String,
FlagDefault: "TRACE",
ConfigKey: &cfg.LogLevel,
CustomSetValue: utils.SetConfigOptionLogLevel,
Required: false,
},
{
Name: "captive-core-bin-path",
Usage: "Path to Captive Core's binary file.",
OptType: types.String,
CustomSetValue: utils.SetConfigOptionCaptiveCoreBinPath,
ConfigKey: &cfg.CaptiveCoreBinPath,
FlagDefault: "/usr/local/bin/stellar-core",
Required: true,
},
{
Name: "captive-core-config-dir",
Usage: "Path to Captive Core's configuration files directory.",
OptType: types.String,
CustomSetValue: utils.SetConfigOptionCaptiveCoreConfigDir,
ConfigKey: &cfg.CaptiveCoreConfigDir,
FlagDefault: "./internal/ingest/config",
Required: true,
},
{
Name: "ledger-cursor-name",
Usage: "Name of last synced ledger cursor, used to keep track of the last ledger ingested by the service. When starting up, ingestion will resume from the ledger number stored in this record. It should be an unique name per container as different containers would overwrite the cursor value of its peers when using the same cursor name.",
OptType: types.String,
ConfigKey: &cfg.LedgerCursorName,
Required: true,
},
{
Name: "start",
Usage: "Ledger number from which ingestion should start. When not present, ingestion will resume from last synced ledger.",
OptType: types.Int,
ConfigKey: &cfg.StartLedger,
FlagDefault: 0,
Required: false,
},
{
Name: "end",
Usage: "Ledger number up to which ingestion should run. When not present, ingestion run indefinitely (live ingestion requires it to be empty).",
OptType: types.Int,
ConfigKey: &cfg.EndLedger,
FlagDefault: 0,
Required: false,
},
}

cmd := &cobra.Command{
Use: "ingest",
Short: "Run Ingestion service",
PersistentPreRun: func(_ *cobra.Command, _ []string) {
cfgOpts.Require()
if err := cfgOpts.SetValues(); err != nil {
log.Fatalf("Error setting values of config options: %s", err.Error())
}
},
Run: func(_ *cobra.Command, _ []string) {
c.Run(cfg)
},
}
if err := cfgOpts.Init(cmd); err != nil {
log.Fatalf("Error initializing a config option: %s", err.Error())
}
return cmd
}

func (c *ingestCmd) Run(cfg ingest.Configs) {
err := ingest.Ingest(cfg)
if err != nil {
log.Fatalf("Error running Ingest: %s", err.Error())
}
}
11 changes: 4 additions & 7 deletions cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package cmd

import (
"log"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
supportlog "github.com/stellar/go/support/log"
"github.com/stellar/go/support/log"
)

// rootCmd represents the base command when called without any subcommands
Expand All @@ -30,8 +27,8 @@ func Execute() {
}

func init() {
logger := supportlog.New()
logger.SetLevel(logrus.TraceLevel)
log.DefaultLogger = log.New()

rootCmd.AddCommand((&serveCmd{Logger: logger}).Command())
rootCmd.AddCommand((&serveCmd{}).Command())
rootCmd.AddCommand((&ingestCmd{}).Command())
}
25 changes: 15 additions & 10 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,15 @@ import (
_ "github.com/lib/pq"
"github.com/spf13/cobra"
"github.com/stellar/go/support/config"
supportlog "github.com/stellar/go/support/log"
"github.com/stellar/go/support/log"
"github.com/stellar/wallet-backend/cmd/utils"
"github.com/stellar/wallet-backend/internal/serve"
)

type serveCmd struct {
Logger *supportlog.Entry
}
type serveCmd struct{}

func (c *serveCmd) Command() *cobra.Command {
cfg := serve.Configs{
Logger: c.Logger,
}
cfg := serve.Configs{}
cfgOpts := config.ConfigOptions{
{
Name: "port",
Expand All @@ -44,6 +40,15 @@ func (c *serveCmd) Command() *cobra.Command {
FlagDefault: "http://localhost:8000",
Required: true,
},
{
Name: "log-level",
Usage: `The log level used in this project. Options: "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL", or "PANIC".`,
OptType: types.String,
FlagDefault: "TRACE",
ConfigKey: &cfg.LogLevel,
CustomSetValue: utils.SetConfigOptionLogLevel,
Required: false,
},
{
Name: "wallet-signing-key",
Usage: "The public key of the Stellar account that signs the payloads when making HTTP Request to this server.",
Expand All @@ -59,22 +64,22 @@ func (c *serveCmd) Command() *cobra.Command {
PersistentPreRun: func(_ *cobra.Command, _ []string) {
cfgOpts.Require()
if err := cfgOpts.SetValues(); err != nil {
c.Logger.Fatalf("Error setting values of config options: %s", err.Error())
log.Fatalf("Error setting values of config options: %s", err.Error())
}
},
Run: func(_ *cobra.Command, _ []string) {
c.Run(cfg)
},
}
if err := cfgOpts.Init(cmd); err != nil {
c.Logger.Fatalf("Error initializing a config option: %s", err.Error())
log.Fatalf("Error initializing a config option: %s", err.Error())
}
return cmd
}

func (c *serveCmd) Run(cfg serve.Configs) {
err := serve.Serve(cfg)
if err != nil {
c.Logger.Fatalf("Error running Serve: %s", err.Error())
log.Fatalf("Error running Serve: %s", err.Error())
}
}
76 changes: 75 additions & 1 deletion cmd/utils/custom_set_value.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,45 @@
package utils

import (
"errors"
"fmt"
"os"

"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/stellar/go/keypair"
"github.com/stellar/go/support/config"
"github.com/stellar/go/support/log"
)

func unexpectedTypeError(key any, co *config.ConfigOption) error {
return fmt.Errorf("the expected type for the config key in %s is %T, but a %T was provided instead", co.Name, key, co.ConfigKey)
}

func SetConfigOptionLogLevel(co *config.ConfigOption) error {
logLevelStr := viper.GetString(co.Name)
logLevel, err := logrus.ParseLevel(logLevelStr)
if err != nil {
return fmt.Errorf("couldn't parse log level in %s: %w", co.Name, err)
}

key, ok := co.ConfigKey.(*logrus.Level)
if !ok {
return fmt.Errorf("%s configKey has an invalid type %T", co.Name, co.ConfigKey)
}
*key = logLevel

// Log for debugging
if config.IsExplicitlySet(co) {
log.Debugf("Setting log level to: %s", logLevel)
log.DefaultLogger.SetLevel(*key)
} else {
log.Debugf("Using default log level: %s", logLevel)
}

return nil
}

func SetConfigOptionStellarPublicKey(co *config.ConfigOption) error {
publicKey := viper.GetString(co.Name)

Expand All @@ -18,9 +50,51 @@ func SetConfigOptionStellarPublicKey(co *config.ConfigOption) error {

key, ok := co.ConfigKey.(*string)
if !ok {
return fmt.Errorf("the expected type for the config key in %s is a string, but a %T was provided instead", co.Name, co.ConfigKey)
return unexpectedTypeError(key, co)
}
*key = kp.Address()

return nil
}

func SetConfigOptionCaptiveCoreBinPath(co *config.ConfigOption) error {
binPath := viper.GetString(co.Name)

fileInfo, err := os.Stat(binPath)
if errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("binary file %s does not exist", binPath)
}

if fileInfo.IsDir() {
return fmt.Errorf("binary file path %s is a directory, not a file", binPath)
}

key, ok := co.ConfigKey.(*string)
if !ok {
return unexpectedTypeError(key, co)
}
*key = binPath

return nil
}

func SetConfigOptionCaptiveCoreConfigDir(co *config.ConfigOption) error {
dirPath := viper.GetString(co.Name)

fileInfo, err := os.Stat(dirPath)
if errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("captive core configuration files dir %s does not exist", dirPath)
}

if !fileInfo.IsDir() {
return fmt.Errorf("captive core configuration files dir %s is not a directory", dirPath)
}

key, ok := co.ConfigKey.(*string)
if !ok {
return unexpectedTypeError(key, co)
}
*key = dirPath

return nil
}
Loading

0 comments on commit a8ec80a

Please sign in to comment.