Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/funny-wombats-teach.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#bugfix bind reg filter before unreg & bind noop when no new addresses
41 changes: 41 additions & 0 deletions core/services/relay/evm/read/bindings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,45 @@ func TestBindingsRegistry(t *testing.T) {
mRdr1.AssertExpectations(t)
mReg.AssertExpectations(t)
})

t.Run("bind twice is noop", func(t *testing.T) {
t.Parallel()

mRdr0 := new(mocks.Reader)
mReg := new(mocks.Registrar)

named := read.NewBindingsRegistry()

// register is called once through RegisterAll and again in Bind
mRdr0.EXPECT().Register(mock.Anything).Return(nil)

mRdr0.EXPECT().Bind(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

mReg.EXPECT().HasFilter(mock.Anything).Return(false)
mReg.EXPECT().RegisterFilter(mock.Anything, mock.Anything).Return(nil).Times(4) //2 times per init + 1 time per bind(3) + 1 time per bind new address
// part of the init phase of chain reader
require.NoError(t, named.AddReader(contractName1, methodName1, mRdr0))
_ = named.SetFilter(contractName1, filterWithSigs)

// run within the start phase of chain reader
require.NoError(t, named.RegisterAll(context.Background(), mReg))

bindings := []commontypes.BoundContract{
{Address: "0x24", Name: contractName1},
{Address: "0x25", Name: contractName1},
{Address: "0x26", Name: contractName1},
}

// calling bind now should trigger register filter just once
_ = named.Bind(context.Background(), mReg, bindings)
_ = named.Bind(context.Background(), mReg, bindings)
_ = named.Bind(context.Background(), mReg, bindings)

// this will trigger register filter
bindings[0].Address = "0x99"
_ = named.Bind(context.Background(), mReg, bindings)

mRdr0.AssertExpectations(t)
mReg.AssertExpectations(t)
})
}
29 changes: 18 additions & 11 deletions core/services/relay/evm/read/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,28 +60,20 @@ func (cb *contractBinding) AddReaderNamed(name string, rdr Reader) {

// Bind binds contract addresses to contract binding and registers the common contract polling filter.
func (cb *contractBinding) Bind(ctx context.Context, registrar Registrar, bindings ...common.Address) error {
if cb.isBound() {
if err := cb.Unregister(ctx, registrar); err != nil {
return err
}
}

for _, binding := range bindings {
if cb.bindingExists(binding) {
continue
}

cb.registrar.SetName(logpoller.FilterName(cb.name + "." + uuid.NewString()))
cb.registrar.AddAddress(binding)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is what I was referring to below. Previously, we were calling cb.Unregister which calls RemoveAddress before calling AddAddress, so it gets removed and then re-added--you can't add the same one twice. But now we skip the isBound so it can be added multiple times. Right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cb.Unregister() doesn't call RemoveAddress, we cant add address multiple times because of
if cb.bindingExists(binding) { continue }

cb.addBinding(binding)
}

// registerCalled during ChainReader start
if cb.registered() {
return cb.Register(ctx, registrar)
if !cb.registrar.Dirty() {
return nil
}

return nil
return cb.Update(ctx, registrar)
}

func (cb *contractBinding) BindReaders(ctx context.Context, addresses ...common.Address) error {
Expand Down Expand Up @@ -159,6 +151,21 @@ func (cb *contractBinding) Register(ctx context.Context, registrar Registrar) er
return nil
}

func (cb *contractBinding) Update(ctx context.Context, registrar Registrar) error {
name := logpoller.FilterName(cb.name + "." + uuid.NewString())

if !cb.registered() {
cb.registrar.SetName(name)
return nil
}

if !cb.registrar.HasEventSigs() {
return nil
}

return cb.registrar.Update(ctx, registrar, name)
}

func (cb *contractBinding) RegisterReaders(ctx context.Context) error {
cb.mu.RLock()
defer cb.mu.RUnlock()
Expand Down
36 changes: 23 additions & 13 deletions core/services/relay/evm/read/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,18 +134,6 @@ func (b *EventBinding) GetDataWords() map[string]DataWordDetail {
}

func (b *EventBinding) Bind(ctx context.Context, bindings ...common.Address) error {
if b.hasBindings() {
// we are changing contract address reference, so we need to unregister old filter if it exists
if err := b.Unregister(ctx); err != nil {
return err
}
}

// filterRegisterer isn't required here because the event can also be polled for by the contractBinding common filter.
if b.registrar != nil {
b.registrar.SetName(logpoller.FilterName(fmt.Sprintf("%s.%s.%s", b.contractName, b.eventName, uuid.NewString())))
}

for _, binding := range bindings {
if b.isBound(binding) {
continue
Expand All @@ -156,10 +144,15 @@ func (b *EventBinding) Bind(ctx context.Context, bindings ...common.Address) err
}

b.addBinding(binding)

}

if b.registrar == nil || !b.registrar.Dirty() {
return nil
}

if b.registered() {
return b.Register(ctx)
return b.Update(ctx)
}

return nil
Expand Down Expand Up @@ -196,6 +189,23 @@ func (b *EventBinding) Unbind(ctx context.Context, bindings ...common.Address) e
return nil
}

func (b *EventBinding) Update(ctx context.Context) error {
b.mu.RLock()
defer b.mu.RUnlock()

name := logpoller.FilterName(fmt.Sprintf("%s.%s.%s", b.contractName, b.eventName, uuid.NewString()))

if b.registrar == nil {
return nil
}

if len(b.bound) == 0 {
return nil
}

return b.registrar.Update(ctx, b.lp, name)
}

func (b *EventBinding) Register(ctx context.Context) error {
b.mu.Lock()
defer b.mu.Unlock()
Expand Down
51 changes: 49 additions & 2 deletions core/services/relay/evm/read/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,44 @@ type syncedFilter struct {
// internal state properties
mu sync.RWMutex
filter logpoller.Filter

// identifies if filter was modified between updates
dirty bool
}

func newSyncedFilter() *syncedFilter {
return &syncedFilter{}
}

func (r *syncedFilter) Update(ctx context.Context, registrar Registrar, updatedName string) error {
r.mu.Lock()
defer r.mu.Unlock()

oldName := r.filter.Name
if !r.dirty {
return nil
}

r.filter.Name = updatedName

if err := r.register(ctx, registrar); err != nil {
return err
}

// filter updated successfully, it's not dirty anymore
r.dirty = false

return r.unregister(ctx, registrar, oldName)
}

func (r *syncedFilter) Register(ctx context.Context, registrar Registrar) error {
r.mu.RLock()
defer r.mu.RUnlock()

return r.register(ctx, registrar)
}

func (r *syncedFilter) register(ctx context.Context, registrar Registrar) error {
if !registrar.HasFilter(r.filter.Name) {
if err := registrar.RegisterFilter(ctx, r.filter); err != nil {
return FilterError{
Expand All @@ -49,11 +77,15 @@ func (r *syncedFilter) Unregister(ctx context.Context, registrar Registrar) erro
r.mu.RLock()
defer r.mu.RUnlock()

if !registrar.HasFilter(r.filter.Name) {
return r.unregister(ctx, registrar, r.filter.Name)
}

func (r *syncedFilter) unregister(ctx context.Context, registrar Registrar, name string) error {
if !registrar.HasFilter(name) {
return nil
}

if err := registrar.UnregisterFilter(ctx, r.filter.Name); err != nil {
if err := registrar.UnregisterFilter(ctx, name); err != nil {
return FilterError{
Err: fmt.Errorf("%w: %s", types.ErrInternal, err.Error()),
Action: "unregister",
Expand All @@ -68,27 +100,35 @@ func (r *syncedFilter) SetFilter(filter logpoller.Filter) {
r.mu.Lock()
defer r.mu.Unlock()

r.dirty = true

r.filter = filter
}

func (r *syncedFilter) SetName(name string) {
r.mu.Lock()
defer r.mu.Unlock()

r.dirty = true

r.filter.Name = name
}

func (r *syncedFilter) AddAddress(address common.Address) {
r.mu.Lock()
defer r.mu.Unlock()

r.dirty = true

r.filter.Addresses = append(r.filter.Addresses, address)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we should have some sort of check here to make sure the address isn't already in the list (or perhaps, use a set instead of a list?).

I'm not sure how often we unregister and re-register the same filter, but if it's a lol then we'll keep appending more and more copies of the same address onto the list.

Copy link
Collaborator Author

@Unheilbar Unheilbar Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I noticed this is already handled by common and event readers (since they make precheck that address is bounded). So it won't keep appending copies at least in its current usage

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior to this PR yes, but with the introduction of this PR it becomes possible. I'll comment above on the place where the check is removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, it doesn't pose any serious problem so if you want to leave it as-is I'm okay with that.

}

func (r *syncedFilter) RemoveAddress(address common.Address) {
r.mu.Lock()
defer r.mu.Unlock()

r.dirty = true

var addrIdx int
for idx, addr := range r.filter.Addresses {
if addr.Hex() == address.Hex() {
Expand All @@ -107,6 +147,13 @@ func (r *syncedFilter) Count() int {
return len(r.filter.Addresses)
}

func (r *syncedFilter) Dirty() bool {
r.mu.RLock()
defer r.mu.RUnlock()

return r.dirty
}

func (r *syncedFilter) HasEventSigs() bool {
r.mu.RLock()
defer r.mu.RUnlock()
Expand Down
7 changes: 1 addition & 6 deletions core/services/relay/evm/read/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ func (l *lookup) addReadNameForContract(contract, readName string) {
l.mu.Lock()
defer l.mu.Unlock()

readNames, exists := l.contractReadNames[contract]
if !exists {
readNames = []string{}
}

l.contractReadNames[contract] = append(readNames, readName)
l.contractReadNames[contract] = append(l.contractReadNames[contract], readName)
}

func (l *lookup) bindAddressForContract(contract, address string) {
Expand Down
Loading