Skip to content

Commit 1c74a3b

Browse files
committed
Add sequence # tracking, fix floating point encoding, add tests for NewIndexedValue
1 parent 05aceff commit 1c74a3b

File tree

7 files changed

+566
-10
lines changed

7 files changed

+566
-10
lines changed

pkg/solana/logpoller/filters.go

+11
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type filters struct {
2929
loadedFilters atomic.Bool
3030
knownPrograms map[string]uint // fast lookup to see if a base58-encoded ProgramID matches any registered filters
3131
knownDiscriminators map[string]uint // fast lookup by first 10 characters (60-bits) of a base64-encoded discriminator
32+
seqNums map[int64]int64
3233
}
3334

3435
func newFilters(lggr logger.SugaredLogger, orm ORM) *filters {
@@ -38,6 +39,11 @@ func newFilters(lggr logger.SugaredLogger, orm ORM) *filters {
3839
}
3940
}
4041

42+
func (fl *filters) IncrementSeqNums(filterID int64) int64 {
43+
fl.seqNums[filterID]++
44+
return fl.seqNums[filterID]
45+
}
46+
4147
// PruneFilters - prunes all filters marked to be deleted from the database and all corresponding logs.
4248
func (fl *filters) PruneFilters(ctx context.Context) error {
4349
err := fl.LoadFilters(ctx)
@@ -385,6 +391,11 @@ func (fl *filters) LoadFilters(ctx context.Context) error {
385391
}
386392
}
387393

394+
fl.seqNums, err = fl.orm.SelectSeqNums(ctx)
395+
if err != nil {
396+
return fmt.Errorf("failed to select sequence numbers from db: %w", err)
397+
}
398+
388399
fl.loadedFilters.Store(true)
389400

390401
return nil

pkg/solana/logpoller/log_poller.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type ORM interface {
3030
MarkFilterDeleted(ctx context.Context, id int64) (err error)
3131
MarkFilterBackfilled(ctx context.Context, id int64) (err error)
3232
InsertLogs(context.Context, []Log) (err error)
33+
SelectSeqNums(ctx context.Context) (map[int64]int64, error)
3334
}
3435

3536
type ILogPoller interface {
@@ -134,10 +135,7 @@ func (lp *LogPoller) Process(programEvent ProgramEvent) (err error) {
134135
subKeyValues = append(subKeyValues, indexedVal)
135136
}
136137

137-
lp.seqNums[filter.ID]++
138-
log.SequenceNum = lp.seqNums
139-
140-
// TODO: fill in, and keep track of SequenceNumber for each filter. (Initialize from db on LoadFilters, then increment each time?)
138+
log.SequenceNum = lp.filters.IncrementSeqNums(filter.ID)
141139

142140
expiresAt := time.Now() // TODO: account for possible discrepencies in time? Seems like retention should be passed directly to ORM
143141
expiresAt.Add(filter.Retention)

pkg/solana/logpoller/mock_orm.go

+58
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/solana/logpoller/orm.go

+10
Original file line numberDiff line numberDiff line change
@@ -204,3 +204,13 @@ func (o *DSORM) SelectLogs(ctx context.Context, start, end int64, address Public
204204
}
205205
return logs, nil
206206
}
207+
208+
func (o *DSORM) SelectSeqNums(ctx context.Context) (map[int64]int64, error) {
209+
seqNums := make(map[int64]int64)
210+
query := "SELECT id, MAX(sequence_num) FROM solana.logs WHERE chain_id=%s GROUP BY id"
211+
err := o.ds.SelectContext(ctx, &seqNums, query, o.chainID)
212+
if err != nil {
213+
return nil, err
214+
}
215+
return seqNums, nil
216+
}

0 commit comments

Comments
 (0)