Skip to content

Commit 072b840

Browse files
committed
Register unregister filters
1 parent de28ddb commit 072b840

12 files changed

+1371
-63
lines changed

.mockery.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,9 @@ packages:
4040
github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller:
4141
interfaces:
4242
RPCClient:
43+
ORM:
44+
config:
45+
inpackage: True
46+
dir: "pkg/solana/logpoller"
47+
filename: mock_orm.go
48+
mockname: mockORM

pkg/solana/logpoller/filters.go

+316
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
package logpoller
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"iter"
8+
"maps"
9+
"sync"
10+
"sync/atomic"
11+
12+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
13+
)
14+
15+
type filters struct {
16+
orm ORM
17+
lggr logger.SugaredLogger
18+
19+
filtersByID map[int64]*Filter
20+
filtersByName map[string]int64
21+
filtersByAddress map[PublicKey]map[EventSignature]map[int64]struct{}
22+
filtersToBackfill map[int64]struct{}
23+
filtersToDelete map[int64]Filter
24+
filtersMutex sync.RWMutex
25+
loadedFilters atomic.Bool
26+
}
27+
28+
func newFilters(lggr logger.SugaredLogger, orm ORM) *filters {
29+
return &filters{
30+
orm: orm,
31+
lggr: lggr,
32+
}
33+
}
34+
35+
// PruneFilters - prunes all filters marked to be deleted from the database and all corresponding logs.
36+
func (fl *filters) PruneFilters(ctx context.Context) error {
37+
err := fl.LoadFilters(ctx)
38+
if err != nil {
39+
return fmt.Errorf("failed to load filters: %w", err)
40+
}
41+
42+
fl.filtersMutex.Lock()
43+
filtersToDelete := fl.filtersToDelete
44+
fl.filtersToDelete = make(map[int64]Filter)
45+
fl.filtersMutex.Unlock()
46+
47+
if len(filtersToDelete) == 0 {
48+
return nil
49+
}
50+
51+
err = fl.orm.DeleteFilters(ctx, filtersToDelete)
52+
if err != nil {
53+
fl.filtersMutex.Lock()
54+
defer fl.filtersMutex.Unlock()
55+
maps.Copy(fl.filtersToDelete, filtersToDelete)
56+
return fmt.Errorf("failed to delete filters: %w", err)
57+
}
58+
59+
return nil
60+
}
61+
62+
// RegisterFilter persists provided filter and ensures that any log emitted by a contract with filter.Address
63+
// that matches filter.EventSig signature will be captured starting from filter.StartingBlock.
64+
// The filter may be unregistered later by filter.Name.
65+
// In case of Filter.Name collision (within the chain scope) returns ErrFilterNameConflict if
66+
// one of the fields defining resulting logs (Address, EventSig, EventIDL, SubkeyPaths) does not match original filter.
67+
// Otherwise, updates remaining fields and schedules backfill.
68+
// Warnings/debug information is keyed by filter name.
69+
func (fl *filters) RegisterFilter(ctx context.Context, filter Filter) error {
70+
if len(filter.Name) == 0 {
71+
return errors.New("name is required")
72+
}
73+
74+
err := fl.LoadFilters(ctx)
75+
if err != nil {
76+
return fmt.Errorf("failed to load filters: %w", err)
77+
}
78+
79+
fl.filtersMutex.Lock()
80+
defer fl.filtersMutex.Unlock()
81+
82+
filter.IsBackfilled = false
83+
if existingFilterID, ok := fl.filtersByName[filter.Name]; ok {
84+
existingFilter := fl.filtersByID[existingFilterID]
85+
if !existingFilter.MatchSameLogs(filter) {
86+
return ErrFilterNameConflict
87+
}
88+
if existingFilter.IsBackfilled {
89+
// if existing filter was already backfilled, but starting block was higher we need to backfill filter again
90+
filter.IsBackfilled = existingFilter.StartingBlock <= filter.StartingBlock
91+
}
92+
93+
fl.removeFilterFromIndexes(*existingFilter)
94+
}
95+
96+
filterID, err := fl.orm.InsertFilter(ctx, filter)
97+
if err != nil {
98+
return fmt.Errorf("failed to insert filter: %w", err)
99+
}
100+
101+
filter.ID = filterID
102+
103+
fl.filtersByName[filter.Name] = filter.ID
104+
fl.filtersByID[filter.ID] = &filter
105+
filtersForAddress, ok := fl.filtersByAddress[filter.Address]
106+
if !ok {
107+
filtersForAddress = make(map[EventSignature]map[int64]struct{})
108+
fl.filtersByAddress[filter.Address] = filtersForAddress
109+
}
110+
111+
filtersForEventSig, ok := filtersForAddress[filter.EventSig]
112+
if !ok {
113+
filtersForEventSig = make(map[int64]struct{})
114+
filtersForAddress[filter.EventSig] = filtersForEventSig
115+
}
116+
117+
filtersForEventSig[filter.ID] = struct{}{}
118+
if !filter.IsBackfilled {
119+
fl.filtersToBackfill[filter.ID] = struct{}{}
120+
}
121+
return nil
122+
}
123+
124+
// UnregisterFilter will mark the filter with the given name for pruning and async prune all corresponding logs.
125+
// If the name does not exist, it will log an error but not return an error.
126+
// Warnings/debug information is keyed by filter name.
127+
func (fl *filters) UnregisterFilter(ctx context.Context, name string) error {
128+
err := fl.LoadFilters(ctx)
129+
if err != nil {
130+
return fmt.Errorf("failed to load filters: %w", err)
131+
}
132+
133+
fl.filtersMutex.Lock()
134+
defer fl.filtersMutex.Unlock()
135+
136+
filterID, ok := fl.filtersByName[name]
137+
if !ok {
138+
fl.lggr.Warnw("Filter not found in filtersByName", "name", name)
139+
return nil
140+
}
141+
142+
filter, ok := fl.filtersByID[filterID]
143+
if !ok {
144+
fl.lggr.Errorw("Filter not found in filtersByID", "id", filterID, "name", name)
145+
return nil
146+
}
147+
148+
if err := fl.orm.MarkFilterDeleted(ctx, filter.ID); err != nil {
149+
return fmt.Errorf("failed to mark filter deleted: %w", err)
150+
}
151+
152+
fl.removeFilterFromIndexes(*filter)
153+
154+
fl.filtersToDelete[filter.ID] = *filter
155+
return nil
156+
}
157+
158+
func (fl *filters) removeFilterFromIndexes(filter Filter) {
159+
delete(fl.filtersByName, filter.Name)
160+
delete(fl.filtersToBackfill, filter.ID)
161+
delete(fl.filtersByID, filter.ID)
162+
163+
filtersForAddress, ok := fl.filtersByAddress[filter.Address]
164+
if !ok {
165+
fl.lggr.Warnw("Filter not found in filtersByAddress", "name", filter.Name, "address", filter.Address)
166+
return
167+
}
168+
169+
filtersForEventSig, ok := filtersForAddress[filter.EventSig]
170+
if !ok {
171+
fl.lggr.Warnw("Filter not found in filtersByEventSig", "name", filter.Name, "address", filter.Address)
172+
return
173+
}
174+
175+
delete(filtersForEventSig, filter.ID)
176+
if len(filtersForEventSig) == 0 {
177+
delete(filtersForAddress, filter.EventSig)
178+
}
179+
180+
if len(filtersForAddress) == 0 {
181+
delete(fl.filtersByAddress, filter.Address)
182+
}
183+
}
184+
185+
// MatchingFilters - returns iterator to go through all matching filters.
186+
// Requires LoadFilters to be called at least once.
187+
func (fl *filters) MatchingFilters(addr PublicKey, eventSignature EventSignature) iter.Seq[Filter] {
188+
if !fl.loadedFilters.Load() {
189+
fl.lggr.Critical("Invariant violation: expected filters to be loaded before call to MatchingFilters")
190+
return nil
191+
}
192+
return func(yield func(Filter) bool) {
193+
fl.filtersMutex.RLock()
194+
defer fl.filtersMutex.RUnlock()
195+
filters, ok := fl.filtersByAddress[addr]
196+
if !ok {
197+
return
198+
}
199+
200+
for filterID := range filters[eventSignature] {
201+
filter, ok := fl.filtersByID[filterID]
202+
if !ok {
203+
fl.lggr.Errorw("expected filter to exist in filtersByID", "filterID", filterID)
204+
continue
205+
}
206+
if !yield(*filter) {
207+
return
208+
}
209+
}
210+
}
211+
}
212+
213+
// GetFiltersToBackfill - returns copy of backfill queue
214+
// Requires LoadFilters to be called at least once.
215+
func (fl *filters) GetFiltersToBackfill() []Filter {
216+
if !fl.loadedFilters.Load() {
217+
fl.lggr.Critical("Invariant violation: expected filters to be loaded before call to MatchingFilters")
218+
return nil
219+
}
220+
fl.filtersMutex.Lock()
221+
defer fl.filtersMutex.Unlock()
222+
result := make([]Filter, 0, len(fl.filtersToBackfill))
223+
for filterID := range fl.filtersToBackfill {
224+
filter, ok := fl.filtersByID[filterID]
225+
if !ok {
226+
fl.lggr.Errorw("expected filter to exist in filtersByID", "filterID", filterID)
227+
continue
228+
}
229+
result = append(result, *filter)
230+
}
231+
232+
return result
233+
}
234+
235+
func (fl *filters) MarkFilterBackfilled(ctx context.Context, filterID int64) error {
236+
fl.filtersMutex.Lock()
237+
defer fl.filtersMutex.Unlock()
238+
filter, ok := fl.filtersByID[filterID]
239+
if !ok {
240+
return fmt.Errorf("filter %d not found", filterID)
241+
}
242+
err := fl.orm.MarkFilterBackfilled(ctx, filterID)
243+
if err != nil {
244+
return fmt.Errorf("failed to mark filter backfilled: %w", err)
245+
}
246+
247+
filter.IsBackfilled = true
248+
delete(fl.filtersToBackfill, filter.ID)
249+
return nil
250+
}
251+
252+
// LoadFilters - loads filters from database. Can be called multiple times without side effects.
253+
func (fl *filters) LoadFilters(ctx context.Context) error {
254+
if fl.loadedFilters.Load() {
255+
return nil
256+
}
257+
258+
fl.lggr.Debugw("Loading filters from db")
259+
fl.filtersMutex.Lock()
260+
defer fl.filtersMutex.Unlock()
261+
// reset filters' indexes to ensure we do not have partial data from the previous run
262+
fl.filtersByID = make(map[int64]*Filter)
263+
fl.filtersByName = make(map[string]int64)
264+
fl.filtersByAddress = make(map[PublicKey]map[EventSignature]map[int64]struct{})
265+
fl.filtersToBackfill = make(map[int64]struct{})
266+
fl.filtersToDelete = make(map[int64]Filter)
267+
268+
filters, err := fl.orm.SelectFilters(ctx)
269+
if err != nil {
270+
return fmt.Errorf("failed to select filters from db: %w", err)
271+
}
272+
273+
for i := range filters {
274+
filter := filters[i]
275+
if filter.IsDeleted {
276+
fl.filtersToDelete[filter.ID] = filter
277+
continue
278+
}
279+
280+
fl.filtersByID[filter.ID] = &filter
281+
282+
if _, ok := fl.filtersByName[filter.Name]; ok {
283+
errMsg := fmt.Sprintf("invariant violation while loading from db: expected filters to have unique name: %s ", filter.Name)
284+
fl.lggr.Critical(errMsg)
285+
return errors.New(errMsg)
286+
}
287+
288+
fl.filtersByName[filter.Name] = filter.ID
289+
filtersForAddress, ok := fl.filtersByAddress[filter.Address]
290+
if !ok {
291+
filtersForAddress = make(map[EventSignature]map[int64]struct{})
292+
fl.filtersByAddress[filter.Address] = filtersForAddress
293+
}
294+
295+
filtersForEventSig, ok := filtersForAddress[filter.EventSig]
296+
if !ok {
297+
filtersForEventSig = make(map[int64]struct{})
298+
filtersForAddress[filter.EventSig] = filtersForEventSig
299+
}
300+
301+
if _, ok := filtersForEventSig[filter.ID]; ok {
302+
errMsg := fmt.Sprintf("invariant violation while loading from db: expected filters to have unique ID: %d ", filter.ID)
303+
fl.lggr.Critical(errMsg)
304+
return errors.New(errMsg)
305+
}
306+
307+
filtersForEventSig[filter.ID] = struct{}{}
308+
if !filter.IsBackfilled {
309+
fl.filtersToBackfill[filter.ID] = struct{}{}
310+
}
311+
}
312+
313+
fl.loadedFilters.Store(true)
314+
315+
return nil
316+
}

0 commit comments

Comments
 (0)