Skip to content

Commit

Permalink
payments: get endpoint (#28)
Browse files Browse the repository at this point in the history
What
Adds a new GET /payments endpoint for fetching the ingested payments.

Why
New endpoint.
  • Loading branch information
daniel-burghardt authored Jun 26, 2024
1 parent f41bed0 commit 6be7be6
Show file tree
Hide file tree
Showing 17 changed files with 993 additions and 280 deletions.
17 changes: 17 additions & 0 deletions internal/data/fixtures.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package data

import (
"context"
"testing"

"github.com/stellar/wallet-backend/internal/db"
"github.com/stretchr/testify/require"
)

func InsertTestPayments(t *testing.T, ctx context.Context, payments []Payment, connectionPool db.ConnectionPool) {
t.Helper()

const query = `INSERT INTO ingest_payments (operation_id, operation_type, transaction_id, transaction_hash, from_address, to_address, src_asset_code, src_asset_issuer, src_amount, dest_asset_code, dest_asset_issuer, dest_amount, created_at, memo) VALUES (:operation_id, :operation_type, :transaction_id, :transaction_hash, :from_address, :to_address, :src_asset_code, :src_asset_issuer, :src_amount, :dest_asset_code, :dest_asset_issuer, :dest_amount, :created_at, :memo);`
_, err := connectionPool.NamedExecContext(ctx, query, payments)
require.NoError(t, err)
}
136 changes: 122 additions & 14 deletions internal/data/payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package data
import (
"context"
"database/sql"
"errors"
"fmt"
"time"

Expand All @@ -14,20 +15,20 @@ type PaymentModel struct {
}

type Payment struct {
OperationID int64 `db:"operation_id"`
OperationType string `db:"operation_type"`
TransactionID int64 `db:"transaction_id"`
TransactionHash string `db:"transaction_hash"`
FromAddress string `db:"from_address"`
ToAddress string `db:"to_address"`
SrcAssetCode string `db:"src_asset_code"`
SrcAssetIssuer string `db:"src_asset_issuer"`
SrcAmount int64 `db:"src_amount"`
DestAssetCode string `db:"dest_asset_code"`
DestAssetIssuer string `db:"dest_asset_issuer"`
DestAmount int64 `db:"dest_amount"`
CreatedAt time.Time `db:"created_at"`
Memo *string `db:"memo"`
OperationID int64 `db:"operation_id" json:"operationId"`
OperationType string `db:"operation_type" json:"operationType"`
TransactionID int64 `db:"transaction_id" json:"transactionId"`
TransactionHash string `db:"transaction_hash" json:"transactionHash"`
FromAddress string `db:"from_address" json:"fromAddress"`
ToAddress string `db:"to_address" json:"toAddress"`
SrcAssetCode string `db:"src_asset_code" json:"srcAssetCode"`
SrcAssetIssuer string `db:"src_asset_issuer" json:"srcAssetIssuer"`
SrcAmount int64 `db:"src_amount" json:"srcAmount"`
DestAssetCode string `db:"dest_asset_code" json:"destAssetCode"`
DestAssetIssuer string `db:"dest_asset_issuer" json:"destAssetIssuer"`
DestAmount int64 `db:"dest_amount" json:"destAmount"`
CreatedAt time.Time `db:"created_at" json:"createdAt"`
Memo *string `db:"memo" json:"memo"`
}

func (m *PaymentModel) GetLatestLedgerSynced(ctx context.Context, cursorName string) (uint32, error) {
Expand Down Expand Up @@ -91,3 +92,110 @@ func (m *PaymentModel) AddPayment(ctx context.Context, tx db.Transaction, paymen

return nil
}

func (m *PaymentModel) GetPaymentsPaginated(ctx context.Context, address string, beforeID, afterID int64, sort SortOrder, limit int) ([]Payment, bool, bool, error) {
if !sort.IsValid() {
return nil, false, false, fmt.Errorf("invalid sort value: %s", sort)
}

if beforeID != 0 && afterID != 0 {
return nil, false, false, errors.New("at most one cursor may be provided, got afterId and beforeId")
}

const filteredSetCTE = `
WITH filtered_set AS (
SELECT * FROM ingest_payments WHERE :address = '' OR :address IN (from_address, to_address)
)
`

var selectQ string
if beforeID != 0 && sort == DESC {
selectQ = "SELECT * FROM (SELECT * FROM filtered_set WHERE operation_id > :before_id ORDER BY operation_id ASC LIMIT :limit) AS reverse_set ORDER BY operation_id DESC"
} else if beforeID != 0 && sort == ASC {
selectQ = "SELECT * FROM (SELECT * FROM filtered_set WHERE operation_id < :before_id ORDER BY operation_id DESC LIMIT :limit) AS reverse_set ORDER BY operation_id ASC"
} else if afterID != 0 && sort == DESC {
selectQ = "SELECT * FROM filtered_set WHERE operation_id < :after_id ORDER BY operation_id DESC LIMIT :limit"
} else if afterID != 0 && sort == ASC {
selectQ = "SELECT * FROM filtered_set WHERE operation_id > :after_id ORDER BY operation_id ASC LIMIT :limit"
} else if sort == ASC {
selectQ = "SELECT * FROM filtered_set ORDER BY operation_id ASC LIMIT :limit"
} else {
selectQ = "SELECT * FROM filtered_set ORDER BY operation_id DESC LIMIT :limit"
}

argumentsMap := map[string]interface{}{
"address": address,
"limit": limit,
"before_id": beforeID,
"after_id": afterID,
}

payments := make([]Payment, 0)
query := fmt.Sprintf("%s %s", filteredSetCTE, selectQ)
query, args, err := PrepareNamedQuery(ctx, m.DB, query, argumentsMap)
if err != nil {
return nil, false, false, fmt.Errorf("preparing named query: %w", err)
}
err = m.DB.SelectContext(ctx, &payments, query, args...)
if err != nil {
return nil, false, false, fmt.Errorf("fetching payments: %w", err)
}

prevExists, nextExists, err := m.existsPrevNext(ctx, filteredSetCTE, address, sort, payments)
if err != nil {
return nil, false, false, fmt.Errorf("checking prev and next pages: %w", err)
}

return payments, prevExists, nextExists, nil
}

func (m *PaymentModel) existsPrevNext(ctx context.Context, filteredSetCTE string, address string, sort SortOrder, payments []Payment) (bool, bool, error) {
firstElementID := FirstPaymentOperationID(payments)
lastElementID := LastPaymentOperationID(payments)

query := fmt.Sprintf(`
%s
SELECT
EXISTS(
SELECT 1 FROM filtered_set WHERE CASE WHEN :sort = 'ASC' THEN operation_id < :first_element_id WHEN :sort = 'DESC' THEN operation_id > :first_element_id END LIMIT 1
) AS prev_exists,
EXISTS(
SELECT 1 FROM filtered_set WHERE CASE WHEN :sort = 'ASC' THEN operation_id > :last_element_id WHEN :sort = 'DESC' THEN operation_id < :last_element_id END LIMIT 1
) AS next_exists
`, filteredSetCTE)

argumentsMap := map[string]interface{}{
"address": address,
"first_element_id": firstElementID,
"last_element_id": lastElementID,
"sort": sort,
}

query, args, err := PrepareNamedQuery(ctx, m.DB, query, argumentsMap)
if err != nil {
return false, false, fmt.Errorf("preparing named query: %w", err)
}

var prevExists, nextExists bool
err = m.DB.QueryRowxContext(ctx, query, args...).Scan(&prevExists, &nextExists)
if err != nil {
return false, false, fmt.Errorf("fetching prev and next exists: %w", err)
}

return prevExists, nextExists, nil
}

func FirstPaymentOperationID(payments []Payment) int64 {
if len(payments) > 0 {
return payments[0].OperationID
}
return 0
}

func LastPaymentOperationID(payments []Payment) int64 {
len := len(payments)
if len > 0 {
return payments[len-1].OperationID
}
return 0
}
116 changes: 116 additions & 0 deletions internal/data/payments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,119 @@ func TestPaymentModelUpdateLatestLedgerSynced(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, uint32(123), lastSyncedLedger)
}

func TestPaymentModelGetPayments(t *testing.T) {
dbt := dbtest.Open(t)
defer dbt.Close()
dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN)
require.NoError(t, err)
defer dbConnectionPool.Close()

ctx := context.Background()
m := &PaymentModel{
DB: dbConnectionPool,
}

dbPayments := []Payment{
{OperationID: 1, OperationType: "OperationTypePayment", TransactionID: 11, TransactionHash: "c370ff20144e4c96b17432b8d14664c1", FromAddress: "GAZ37ZO4TU3H", ToAddress: "GDD2HQO6IOFT", SrcAssetCode: "XLM", SrcAssetIssuer: "", SrcAmount: 10, DestAssetCode: "XLM", DestAssetIssuer: "", DestAmount: 10, CreatedAt: time.Date(2024, 6, 21, 0, 0, 0, 0, time.UTC), Memo: nil},
{OperationID: 2, OperationType: "OperationTypePayment", TransactionID: 22, TransactionHash: "30850d8fc7d1439782885103390cd975", FromAddress: "GBZ5Q56JKHJQ", ToAddress: "GASV72SENBSY", SrcAssetCode: "XLM", SrcAssetIssuer: "", SrcAmount: 20, DestAssetCode: "XLM", DestAssetIssuer: "", DestAmount: 20, CreatedAt: time.Date(2024, 6, 22, 0, 0, 0, 0, time.UTC), Memo: nil},
{OperationID: 3, OperationType: "OperationTypePayment", TransactionID: 33, TransactionHash: "d9521ed7057d4d1e9b9dd22ab515cbf1", FromAddress: "GAYFAYPOECBT", ToAddress: "GDWDPNMALNIT", SrcAssetCode: "XLM", SrcAssetIssuer: "", SrcAmount: 30, DestAssetCode: "XLM", DestAssetIssuer: "", DestAmount: 30, CreatedAt: time.Date(2024, 6, 23, 0, 0, 0, 0, time.UTC), Memo: nil},
{OperationID: 4, OperationType: "OperationTypePayment", TransactionID: 44, TransactionHash: "2af98496a86741c6a6814200e06027fd", FromAddress: "GACKTNR2QQXU", ToAddress: "GBZ5KUZHAAVI", SrcAssetCode: "USDC", SrcAssetIssuer: "GAHLU7PDIQMZ", SrcAmount: 40, DestAssetCode: "USDC", DestAssetIssuer: "GAHLU7PDIQMZ", DestAmount: 40, CreatedAt: time.Date(2024, 6, 24, 0, 0, 0, 0, time.UTC), Memo: nil},
{OperationID: 5, OperationType: "OperationTypePayment", TransactionID: 55, TransactionHash: "edfab36f9f104c4fb74b549de44cfbcc", FromAddress: "GA4CMYJEC5W5", ToAddress: "GAZ37ZO4TU3H", SrcAssetCode: "USDC", SrcAssetIssuer: "GAHLU7PDIQMZ", SrcAmount: 50, DestAssetCode: "USDC", DestAssetIssuer: "GAHLU7PDIQMZ", DestAmount: 50, CreatedAt: time.Date(2024, 6, 25, 0, 0, 0, 0, time.UTC), Memo: nil},
}
InsertTestPayments(t, ctx, dbPayments, dbConnectionPool)

t.Run("no_filter_desc", func(t *testing.T) {
payments, prevExists, nextExists, err := m.GetPaymentsPaginated(ctx, "", 0, 0, DESC, 2)
require.NoError(t, err)

assert.False(t, prevExists)
assert.True(t, nextExists)

assert.Equal(t, []Payment{
dbPayments[4],
dbPayments[3],
}, payments)
})

t.Run("no_filter_asc", func(t *testing.T) {
payments, prevExists, nextExists, err := m.GetPaymentsPaginated(ctx, "", 0, 0, ASC, 2)
require.NoError(t, err)

assert.False(t, prevExists)
assert.True(t, nextExists)

assert.Equal(t, []Payment{
dbPayments[0],
dbPayments[1],
}, payments)
})

t.Run("filter_address", func(t *testing.T) {
payments, prevExists, nextExists, err := m.GetPaymentsPaginated(ctx, dbPayments[1].FromAddress, 0, 0, DESC, 2)
require.NoError(t, err)

assert.False(t, prevExists)
assert.False(t, nextExists)

assert.Equal(t, []Payment{
dbPayments[1],
}, payments)
})

t.Run("filter_after_id_desc", func(t *testing.T) {
payments, prevExists, nextExists, err := m.GetPaymentsPaginated(ctx, "", 0, dbPayments[3].OperationID, DESC, 2)
require.NoError(t, err)

assert.True(t, prevExists)
assert.True(t, nextExists)

assert.Equal(t, []Payment{
dbPayments[2],
dbPayments[1],
}, payments)
})

t.Run("filter_after_id_asc", func(t *testing.T) {
payments, prevExists, nextExists, err := m.GetPaymentsPaginated(ctx, "", 0, dbPayments[3].OperationID, ASC, 2)
require.NoError(t, err)

assert.True(t, prevExists)
assert.False(t, nextExists)

assert.Equal(t, []Payment{
dbPayments[4],
}, payments)
})

t.Run("filter_before_id_desc", func(t *testing.T) {
payments, prevExists, nextExists, err := m.GetPaymentsPaginated(ctx, "", dbPayments[2].OperationID, 0, DESC, 2)
require.NoError(t, err)

assert.False(t, prevExists)
assert.True(t, nextExists)

assert.Equal(t, []Payment{
dbPayments[4],
dbPayments[3],
}, payments)
})

t.Run("filter_before_id_asc", func(t *testing.T) {
payments, prevExists, nextExists, err := m.GetPaymentsPaginated(ctx, "", dbPayments[2].OperationID, 0, ASC, 2)
require.NoError(t, err)

assert.False(t, prevExists)
assert.True(t, nextExists)

assert.Equal(t, []Payment{
dbPayments[0],
dbPayments[1],
}, payments)
})

t.Run("filter_before_id_after_id_asc", func(t *testing.T) {
_, _, _, err := m.GetPaymentsPaginated(ctx, "", dbPayments[4].OperationID, dbPayments[2].OperationID, ASC, 2)
assert.ErrorContains(t, err, "at most one cursor may be provided, got afterId and beforeId")
})
}
36 changes: 36 additions & 0 deletions internal/data/query_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package data

import (
"context"
"fmt"

"github.com/jmoiron/sqlx"
"github.com/stellar/wallet-backend/internal/db"
)

type SortOrder string

const (
ASC SortOrder = "ASC"
DESC SortOrder = "DESC"
)

func (o SortOrder) IsValid() bool {
return o == ASC || o == DESC
}

// PrepareNamedQuery prepares the given query replacing the named parameters with Postgres' bindvars.
// It returns an SQL Injection-safe query and the arguments array to be used alongside it.
func PrepareNamedQuery(ctx context.Context, connectionPool db.ConnectionPool, namedQuery string, argsMap map[string]interface{}) (string, []interface{}, error) {
query, args, err := sqlx.Named(namedQuery, argsMap)
if err != nil {
return "", nil, fmt.Errorf("replacing attributes with bindvars: %w", err)
}
query, args, err = sqlx.In(query, args...)
if err != nil {
return "", nil, fmt.Errorf("expanding slice arguments: %w", err)
}
query = connectionPool.Rebind(query)

return query, args, nil
}
1 change: 1 addition & 0 deletions internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ var _ Transaction = (*sqlx.Tx)(nil)
type SQLExecuter interface {
DriverName() string
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
NamedExecContext(ctx context.Context, query string, arg interface{}) (sql.Result, error)
GetContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error
sqlx.PreparerContext
sqlx.QueryerContext
Expand Down
11 changes: 11 additions & 0 deletions internal/entities/pagination.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package entities

type PaginationLinks struct {
Self string `json:"self"`
Prev string `json:"next"`
Next string `json:"prev"`
}

type Pagination struct {
Links PaginationLinks `json:"_links"`
}
8 changes: 1 addition & 7 deletions internal/serve/httphandler/account_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"net/http"

"github.com/stellar/go/support/http/httpdecode"
"github.com/stellar/go/support/render/httpjson"
"github.com/stellar/go/txnbuild"
"github.com/stellar/wallet-backend/internal/entities"
Expand Down Expand Up @@ -35,12 +34,7 @@ func (h AccountHandler) SponsorAccountCreation(rw http.ResponseWriter, req *http
ctx := req.Context()

var reqBody SponsorAccountCreationRequest
if err := httpdecode.DecodeJSON(req, &reqBody); err != nil {
httperror.BadRequest("", nil).Render(rw)
return
}

httpErr := ValidateRequestBody(ctx, reqBody)
httpErr := DecodeJSONAndValidate(ctx, req, &reqBody)
if httpErr != nil {
httpErr.Render(rw)
return
Expand Down
Loading

0 comments on commit 6be7be6

Please sign in to comment.