Skip to content

Commit

Permalink
bfg,bss: add pagination to PoP txs and payouts (#196)
Browse files Browse the repository at this point in the history
* allow pagination of pop payouts

allowing pagination of flow bss.popPayouts --> bfg.popTxsForL2Block

* small cleanup

* updated error handling to be single line

* pr feedback

* checking number of unique results

* deep equal arrays of miner addresses in tests
  • Loading branch information
ClaytonNorthey92 authored Oct 10, 2024
1 parent ee45b5c commit 06ca6ef
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 10 deletions.
1 change: 1 addition & 0 deletions api/bfgapi/bfgapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type BitcoinUTXOsResponse struct {

type PopTxsForL2BlockRequest struct {
L2Block api.ByteSlice `json:"l2_block"`
Page uint32 `json:"page,omitempty"`
}

type PopTxsForL2BlockResponse struct {
Expand Down
1 change: 1 addition & 0 deletions api/bssapi/bssapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type PopPayout struct {

type PopPayoutsRequest struct {
L2BlockForPayout api.ByteSlice `json:"l2_block_for_payout"`
Page uint32 `json:"page,omitempty"`

// these are unused at this point, they will be used in the future to determine the
// total payout to miners
Expand Down
2 changes: 1 addition & 1 deletion database/bfgd/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type Database interface {
BtcBlocksHeightsWithNoChildren(ctx context.Context) ([]uint64, error)

// Pop data
PopBasisByL2KeystoneAbrevHash(ctx context.Context, aHash [32]byte, excludeUnconfirmed bool) ([]PopBasis, error)
PopBasisByL2KeystoneAbrevHash(ctx context.Context, aHash [32]byte, excludeUnconfirmed bool, page uint32) ([]PopBasis, error)
PopBasisInsertFull(ctx context.Context, pb *PopBasis) error
PopBasisInsertPopMFields(ctx context.Context, pb *PopBasis) error
PopBasisUpdateBTCFields(ctx context.Context, pb *PopBasis) (int64, error)
Expand Down
8 changes: 7 additions & 1 deletion database/bfgd/database_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func TestDatabasePostgres(t *testing.T) {
}

// Pop basis get half
pbHalfOut, err := db.PopBasisByL2KeystoneAbrevHash(ctx, l2KAH, true)
pbHalfOut, err := db.PopBasisByL2KeystoneAbrevHash(ctx, l2KAH, true, 0)
if err != nil {
t.Fatalf("Failed to get pop basis: %v", err)
}
Expand Down Expand Up @@ -981,6 +981,7 @@ func TestPopBasisInsertNilMerklePath(t *testing.T) {
ctx,
[32]byte(fillOutBytes("l2keystoneabrevhash", 32)),
false,
0,
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1026,6 +1027,7 @@ func TestPopBasisInsertNotNilMerklePath(t *testing.T) {
ctx,
[32]byte(fillOutBytes("l2keystoneabrevhash", 32)),
false,
0,
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1070,6 +1072,7 @@ func TestPopBasisInsertNilMerklePathFromPopM(t *testing.T) {
ctx,
[32]byte(fillOutBytes("l2keystoneabrevhash", 32)),
false,
0,
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1195,6 +1198,7 @@ func TestPopBasisUpdateOneExistsWithNonNullBTCFields(t *testing.T) {
ctx,
[32]byte(fillOutBytes("l2keystoneabrevhash", 32)),
false,
0,
)
if err != nil {
t.Fatal(err)
Expand All @@ -1217,6 +1221,7 @@ func TestPopBasisUpdateOneExistsWithNonNullBTCFields(t *testing.T) {
ctx,
[32]byte(fillOutBytes("l2keystoneabrevhash", 32)),
false,
0,
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1305,6 +1310,7 @@ func TestPopBasisUpdateOneExistsWithNullBTCFields(t *testing.T) {
ctx,
[32]byte(fillOutBytes("l2keystoneabrevhash", 32)),
false,
0,
)
if err != nil {
t.Fatal(err)
Expand Down
14 changes: 12 additions & 2 deletions database/bfgd/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,13 @@ func (p *pgdb) PopBasisInsertFull(ctx context.Context, pb *bfgd.PopBasis) error
return nil
}

func (p *pgdb) PopBasisByL2KeystoneAbrevHash(ctx context.Context, aHash [32]byte, excludeUnconfirmed bool) ([]bfgd.PopBasis, error) {
func (p *pgdb) PopBasisByL2KeystoneAbrevHash(ctx context.Context, aHash [32]byte, excludeUnconfirmed bool, page uint32) ([]bfgd.PopBasis, error) {
// can change later as needed
limit := uint32(100)

// start at page 0
offset := limit * page

q := `
SELECT
id,
Expand All @@ -495,9 +501,13 @@ func (p *pgdb) PopBasisByL2KeystoneAbrevHash(ctx context.Context, aHash [32]byte
q += " AND btc_block_hash IS NOT NULL"
}

// use ORDER BY so pagination maintains an order of some sort (so we don't
// respond multiple times with the same record on different pages)
q += " ORDER BY id OFFSET $2 LIMIT $3"

pbs := []bfgd.PopBasis{}
log.Infof("querying for hash: %v", database.ByteArray(aHash[:]))
rows, err := p.db.QueryContext(ctx, q, aHash[:])
rows, err := p.db.QueryContext(ctx, q, aHash[:], offset, limit)
if err != nil {
return nil, err
}
Expand Down
190 changes: 187 additions & 3 deletions e2e/e2e_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/hemilabs/heminetwork/api/protocol"
"github.com/hemilabs/heminetwork/database/bfgd"
"github.com/hemilabs/heminetwork/database/bfgd/postgres"
"github.com/hemilabs/heminetwork/ethereum"
"github.com/hemilabs/heminetwork/hemi"
"github.com/hemilabs/heminetwork/hemi/electrs"
"github.com/hemilabs/heminetwork/hemi/pop"
Expand Down Expand Up @@ -1629,7 +1630,7 @@ func TestBitcoinBroadcastDuplicate(t *testing.T) {
publicKeyUncompressed := publicKey.SerializeUncompressed()

// 3
popBases, err := db.PopBasisByL2KeystoneAbrevHash(ctx, [32]byte(hemi.L2KeystoneAbbreviate(l2Keystone).Hash()), false)
popBases, err := db.PopBasisByL2KeystoneAbrevHash(ctx, [32]byte(hemi.L2KeystoneAbbreviate(l2Keystone).Hash()), false, 0)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1840,7 +1841,7 @@ loop:
case <-lctx.Done():
break loop
case <-time.After(1 * time.Second):
popBases, err = db.PopBasisByL2KeystoneAbrevHash(ctx, [32]byte(hemi.L2KeystoneAbbreviate(l2Keystone).Hash()), false)
popBases, err = db.PopBasisByL2KeystoneAbrevHash(ctx, [32]byte(hemi.L2KeystoneAbbreviate(l2Keystone).Hash()), false, 0)
if len(popBases) > 0 {
break loop
}
Expand Down Expand Up @@ -2010,7 +2011,7 @@ loop:
case <-lctx.Done():
break loop
case <-time.After(1 * time.Second):
popBases, err = db.PopBasisByL2KeystoneAbrevHash(ctx, [32]byte(hemi.L2KeystoneAbbreviate(l2Keystone).Hash()), true)
popBases, err = db.PopBasisByL2KeystoneAbrevHash(ctx, [32]byte(hemi.L2KeystoneAbbreviate(l2Keystone).Hash()), true, 0)
if len(popBases) > 0 {
break loop
}
Expand Down Expand Up @@ -2283,6 +2284,189 @@ func TestPopPayouts(t *testing.T) {
}
}

func TestPopPayoutsMultiplePages(t *testing.T) {
db, pgUri, sdb, cleanup := createTestDB(context.Background(), t)
defer func() {
db.Close()
sdb.Close()
cleanup()
}()

ctx, cancel := defaultTestContext()
defer cancel()

includedL2Keystone := hemi.L2Keystone{
Version: 1,
L1BlockNumber: 11,
L2BlockNumber: 22,
ParentEPHash: fillOutBytes("parentephash", 32),
PrevKeystoneEPHash: fillOutBytes("prevkeystoneephash", 32),
StateRoot: fillOutBytes("stateroot", 32),
EPHash: fillOutBytes("ephash", 32),
}

btcHeaderHash := fillOutBytes("btcheaderhash", 32)

btcBlock := bfgd.BtcBlock{
Hash: btcHeaderHash,
Header: fillOutBytes("btcheader", 80),
Height: 99,
}

if err := db.BtcBlockInsert(ctx, &btcBlock); err != nil {
t.Fatal(err)
}

// insert 151 pop payouts to different miners, get the first 3 pages,
// we expect result counts like so : 100, 51, 0
var txIndex uint64 = 1

addresses := []string{}

for range 151 {
privateKey, err := dcrsecp256k1.GeneratePrivateKeyFromRand(rand.Reader)
if err != nil {
t.Fatal(err)
}

address := ethereum.AddressFromPrivateKey(privateKey)
addresses = append(addresses, address.String())

publicKey := privateKey.PubKey()
publicKeyUncompressed := publicKey.SerializeUncompressed()

txIndex++
popBasis := bfgd.PopBasis{
BtcTxId: fillOutBytes("btctxid1", 32),
BtcRawTx: []byte("btcrawtx1"),
PopTxId: fillOutBytes("poptxid1", 32),
L2KeystoneAbrevHash: hemi.L2KeystoneAbbreviate(includedL2Keystone).Hash(),
PopMinerPublicKey: publicKeyUncompressed,
BtcHeaderHash: btcHeaderHash,
BtcTxIndex: &txIndex,
}

if err := db.PopBasisInsertFull(ctx, &popBasis); err != nil {
t.Fatal(err)
}
}

_, _, bfgWsurl, _ := createBfgServer(ctx, t, pgUri, "", 1)

_, _, bssWsurl := createBssServer(ctx, t, bfgWsurl)

c, _, err := websocket.Dial(ctx, bssWsurl, nil)
if err != nil {
t.Fatal(err)
}
defer c.CloseNow()

assertPing(ctx, t, c, bssapi.CmdPingRequest)

bws := &bssWs{
conn: protocol.NewWSConn(c),
}

serializedL2Keystone := hemi.L2KeystoneAbbreviate(includedL2Keystone).Serialize()

receivedAddresses := []string{}

popPayoutsRequest := bssapi.PopPayoutsRequest{
L2BlockForPayout: serializedL2Keystone[:],
}

err = bssapi.Write(ctx, bws.conn, "someid", popPayoutsRequest)
if err != nil {
t.Fatal(err)
}

var v protocol.Message
if err := wsjson.Read(ctx, c, &v); err != nil {
t.Fatal(err)
}

if v.Header.Command != bssapi.CmdPopPayoutResponse {
t.Fatalf("received unexpected command: %s", v.Header.Command)
}

popPayoutsResponse := bssapi.PopPayoutsResponse{}
if err := json.Unmarshal(v.Payload, &popPayoutsResponse); err != nil {
t.Fatal(err)
}

if len(popPayoutsResponse.PopPayouts) != 100 {
t.Fatalf(
"expected first page to have 100 results, received %d",
len(popPayoutsResponse.PopPayouts),
)
}

for _, p := range popPayoutsResponse.PopPayouts {
receivedAddresses = append(receivedAddresses, p.MinerAddress.String())
}

popPayoutsRequest.Page = 1
err = bssapi.Write(ctx, bws.conn, "someid", popPayoutsRequest)
if err != nil {
t.Fatal(err)
}

if err := wsjson.Read(ctx, c, &v); err != nil {
t.Fatal(err)
}

if v.Header.Command != bssapi.CmdPopPayoutResponse {
t.Fatalf("received unexpected command: %s", v.Header.Command)
}

err = json.Unmarshal(v.Payload, &popPayoutsResponse)
if err != nil {
t.Fatal(err)
}

if len(popPayoutsResponse.PopPayouts) != 51 {
t.Fatalf(
"expected first page to have 51 results, received %d",
len(popPayoutsResponse.PopPayouts),
)
}

for _, p := range popPayoutsResponse.PopPayouts {
receivedAddresses = append(receivedAddresses, p.MinerAddress.String())
}

popPayoutsRequest.Page = 2
err = bssapi.Write(ctx, bws.conn, "someid", popPayoutsRequest)
if err != nil {
t.Fatal(err)
}

if err := wsjson.Read(ctx, c, &v); err != nil {
t.Fatal(err)
}

if v.Header.Command != bssapi.CmdPopPayoutResponse {
t.Fatalf("received unexpected command: %s", v.Header.Command)
}

if err := json.Unmarshal(v.Payload, &popPayoutsResponse); err != nil {
t.Fatal(err)
}

if len(popPayoutsResponse.PopPayouts) != 0 {
t.Fatalf(
"expected first page to have 0 results, received %d",
len(popPayoutsResponse.PopPayouts))
}

slices.Sort(addresses)
slices.Sort(receivedAddresses)

if diff := deep.Equal(addresses, receivedAddresses); len(diff) != 0 {
t.Fatalf("unexpected diff %v", diff)
}
}

func TestGetMostRecentL2BtcFinalitiesBSS(t *testing.T) {
db, pgUri, sdb, cleanup := createTestDB(context.Background(), t)
defer func() {
Expand Down
7 changes: 4 additions & 3 deletions service/bfg/bfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -1201,16 +1201,17 @@ func (s *Server) handlePopTxsForL2Block(ctx context.Context, ptl2 *bfgapi.PopTxs
hash := hemi.HashSerializedL2KeystoneAbrev(ptl2.L2Block)
var h [32]byte
copy(h[:], hash)
popTxs, err := s.db.PopBasisByL2KeystoneAbrevHash(ctx, h, true)

response := &bfgapi.PopTxsForL2BlockResponse{}

popTxs, err := s.db.PopBasisByL2KeystoneAbrevHash(ctx, h, true, ptl2.Page)
if err != nil {
e := protocol.NewInternalErrorf("error getting pop basis: %w", err)
return &bfgapi.PopTxsForL2BlockResponse{
Error: e.ProtocolError(),
}, e
}

response := &bfgapi.PopTxsForL2BlockResponse{}
response.PopTxs = make([]bfgapi.PopTx, 0, len(popTxs))
for k := range popTxs {
response.PopTxs = append(response.PopTxs, bfgapi.PopTx{
BtcTxId: api.ByteSlice(popTxs[k].BtcTxId),
Expand Down
1 change: 1 addition & 0 deletions service/bss/bss.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func (s *Server) handlePopPayoutsRequest(ctx context.Context, msg *bssapi.PopPay

popTxsForL2BlockRes, err := s.callBFG(ctx, bfgapi.PopTxsForL2BlockRequest{
L2Block: msg.L2BlockForPayout,
Page: msg.Page,
})
if err != nil {
e := protocol.NewInternalErrorf("pop tx for l2: block %w", err)
Expand Down

0 comments on commit 06ca6ef

Please sign in to comment.