Skip to content

Commit

Permalink
Pull request 2291: AGDNS-2374-slog-client
Browse files Browse the repository at this point in the history
Squashed commit of the following:

commit e8e6dba
Merge: 9292837 41cce62
Author: Stanislav Chzhen <[email protected]>
Date:   Tue Oct 22 13:46:26 2024 +0300

    Merge branch 'master' into AGDNS-2374-slog-client

commit 9292837
Author: Stanislav Chzhen <[email protected]>
Date:   Tue Oct 15 14:30:00 2024 +0300

    client: imp tests

commit f29d8ed
Author: Stanislav Chzhen <[email protected]>
Date:   Mon Oct 14 15:03:08 2024 +0300

    client: imp docs

commit 0b4311a
Author: Stanislav Chzhen <[email protected]>
Date:   Fri Oct 11 19:12:50 2024 +0300

    all: imp code

commit 1ad99ee
Author: Stanislav Chzhen <[email protected]>
Date:   Thu Oct 10 20:59:46 2024 +0300

    all: slog client
  • Loading branch information
schzhn committed Oct 22, 2024
1 parent 41cce62 commit e529d29
Show file tree
Hide file tree
Showing 14 changed files with 233 additions and 123 deletions.
17 changes: 11 additions & 6 deletions internal/aghtest/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ func (s *ServiceWithConfig[ConfigType]) Config() (c ConfigType) {
// AddressProcessor is a fake [client.AddressProcessor] implementation for
// tests.
type AddressProcessor struct {
OnProcess func(ip netip.Addr)
OnProcess func(ctx context.Context, ip netip.Addr)
OnClose func() (err error)
}

// Process implements the [client.AddressProcessor] interface for
// *AddressProcessor.
func (p *AddressProcessor) Process(ip netip.Addr) {
p.OnProcess(ip)
func (p *AddressProcessor) Process(ctx context.Context, ip netip.Addr) {
p.OnProcess(ctx, ip)
}

// Close implements the [client.AddressProcessor] interface for
Expand All @@ -107,13 +107,18 @@ func (p *AddressProcessor) Close() (err error) {

// AddressUpdater is a fake [client.AddressUpdater] implementation for tests.
type AddressUpdater struct {
OnUpdateAddress func(ip netip.Addr, host string, info *whois.Info)
OnUpdateAddress func(ctx context.Context, ip netip.Addr, host string, info *whois.Info)
}

// UpdateAddress implements the [client.AddressUpdater] interface for
// *AddressUpdater.
func (p *AddressUpdater) UpdateAddress(ip netip.Addr, host string, info *whois.Info) {
p.OnUpdateAddress(ip, host, info)
func (p *AddressUpdater) UpdateAddress(
ctx context.Context,
ip netip.Addr,
host string,
info *whois.Info,
) {
p.OnUpdateAddress(ctx, ip, host, info)
}

// Package dnsforward
Expand Down
54 changes: 35 additions & 19 deletions internal/client/addrproc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/AdguardTeam/AdGuardHome/internal/rdns"
"github.com/AdguardTeam/AdGuardHome/internal/whois"
"github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log"
"github.com/AdguardTeam/golibs/logutil/slogutil"
"github.com/AdguardTeam/golibs/netutil"
)
Expand All @@ -22,7 +21,7 @@ const ErrClosed errors.Error = "use of closed address processor"

// AddressProcessor is the interface for types that can process clients.
type AddressProcessor interface {
Process(ip netip.Addr)
Process(ctx context.Context, ip netip.Addr)
Close() (err error)
}

Expand All @@ -33,7 +32,7 @@ type EmptyAddrProc struct{}
var _ AddressProcessor = EmptyAddrProc{}

// Process implements the [AddressProcessor] interface for EmptyAddrProc.
func (EmptyAddrProc) Process(_ netip.Addr) {}
func (EmptyAddrProc) Process(_ context.Context, _ netip.Addr) {}

// Close implements the [AddressProcessor] interface for EmptyAddrProc.
func (EmptyAddrProc) Close() (_ error) { return nil }
Expand Down Expand Up @@ -90,12 +89,15 @@ type DefaultAddrProcConfig struct {
type AddressUpdater interface {
// UpdateAddress updates information about an IP address, setting host (if
// not empty) and WHOIS information (if not nil).
UpdateAddress(ip netip.Addr, host string, info *whois.Info)
UpdateAddress(ctx context.Context, ip netip.Addr, host string, info *whois.Info)
}

// DefaultAddrProc processes incoming client addresses with rDNS and WHOIS, if
// configured, and updates that information in a client storage.
type DefaultAddrProc struct {
// logger is used to log the operation of address processor.
logger *slog.Logger

// clientIPsMu serializes closure of clientIPs and access to isClosed.
clientIPsMu *sync.Mutex

Expand Down Expand Up @@ -142,6 +144,7 @@ const (
// not be nil.
func NewDefaultAddrProc(c *DefaultAddrProcConfig) (p *DefaultAddrProc) {
p = &DefaultAddrProc{
logger: c.BaseLogger.With(slogutil.KeyPrefix, "addrproc"),
clientIPsMu: &sync.Mutex{},
clientIPs: make(chan netip.Addr, defaultQueueSize),
rdns: &rdns.Empty{},
Expand All @@ -164,10 +167,13 @@ func NewDefaultAddrProc(c *DefaultAddrProcConfig) (p *DefaultAddrProc) {
p.whois = newWHOIS(c.BaseLogger.With(slogutil.KeyPrefix, "whois"), c.DialContext)
}

go p.process(c.CatchPanics)
// TODO(s.chzhen): Pass context.
ctx := context.TODO()

go p.process(ctx, c.CatchPanics)

for _, ip := range c.InitialAddresses {
p.Process(ip)
p.Process(ctx, ip)
}

return p
Expand Down Expand Up @@ -210,7 +216,7 @@ func newWHOIS(logger *slog.Logger, dialFunc aghnet.DialContextFunc) (w whois.Int
var _ AddressProcessor = (*DefaultAddrProc)(nil)

// Process implements the [AddressProcessor] interface for *DefaultAddrProc.
func (p *DefaultAddrProc) Process(ip netip.Addr) {
func (p *DefaultAddrProc) Process(ctx context.Context, ip netip.Addr) {
p.clientIPsMu.Lock()
defer p.clientIPsMu.Unlock()

Expand All @@ -222,38 +228,42 @@ func (p *DefaultAddrProc) Process(ip netip.Addr) {
case p.clientIPs <- ip:
// Go on.
default:
log.Debug("clients: ip channel is full; len: %d", len(p.clientIPs))
p.logger.DebugContext(ctx, "ip channel is full", "len", len(p.clientIPs))
}
}

// process processes the incoming client IP-address information. It is intended
// to be used as a goroutine. Once clientIPs is closed, process exits.
func (p *DefaultAddrProc) process(catchPanics bool) {
func (p *DefaultAddrProc) process(ctx context.Context, catchPanics bool) {
if catchPanics {
defer log.OnPanic("addrProcessor.process")
defer slogutil.RecoverAndLog(ctx, p.logger)
}

log.Info("clients: processing addresses")

ctx := context.TODO()
p.logger.InfoContext(ctx, "processing addresses")

for ip := range p.clientIPs {
host := p.processRDNS(ctx, ip)
info := p.processWHOIS(ctx, ip)

p.addrUpdater.UpdateAddress(ip, host, info)
p.addrUpdater.UpdateAddress(ctx, ip, host, info)
}

log.Info("clients: finished processing addresses")
p.logger.InfoContext(ctx, "finished processing addresses")
}

// processRDNS resolves the clients' IP addresses using reverse DNS. host is
// empty if there were errors or if the information hasn't changed.
func (p *DefaultAddrProc) processRDNS(ctx context.Context, ip netip.Addr) (host string) {
start := time.Now()
log.Debug("clients: processing %s with rdns", ip)
p.logger.DebugContext(ctx, "processing rdns", "ip", ip)
defer func() {
log.Debug("clients: finished processing %s with rdns in %s", ip, time.Since(start))
p.logger.DebugContext(
ctx,
"finished processing rdns",
"ip", ip,
"host", host,
"elapsed", time.Since(start),
)
}()

ok := p.shouldResolve(ip)
Expand All @@ -280,9 +290,15 @@ func (p *DefaultAddrProc) shouldResolve(ip netip.Addr) (ok bool) {
// hasn't changed.
func (p *DefaultAddrProc) processWHOIS(ctx context.Context, ip netip.Addr) (info *whois.Info) {
start := time.Now()
log.Debug("clients: processing %s with whois", ip)
p.logger.DebugContext(ctx, "processing whois", "ip", ip)
defer func() {
log.Debug("clients: finished processing %s with whois in %s", ip, time.Since(start))
p.logger.DebugContext(
ctx,
"finished processing whois",
"ip", ip,
"whois", info,
"elapsed", time.Since(start),
)
}()

// TODO(s.chzhen): Move the timeout logic from WHOIS configuration to the
Expand Down
17 changes: 11 additions & 6 deletions internal/client/addrproc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func TestEmptyAddrProc(t *testing.T) {
p := client.EmptyAddrProc{}

assert.NotPanics(t, func() {
p.Process(testIP)
ctx := testutil.ContextWithTimeout(t, testTimeout)
p.Process(ctx, testIP)
})

assert.NotPanics(t, func() {
Expand Down Expand Up @@ -120,7 +121,8 @@ func TestDefaultAddrProc_Process_rDNS(t *testing.T) {
})
testutil.CleanupAndRequireSuccess(t, p.Close)

p.Process(tc.ip)
ctx := testutil.ContextWithTimeout(t, testTimeout)
p.Process(ctx, tc.ip)

if !tc.wantUpd {
return
Expand All @@ -146,8 +148,8 @@ func newOnUpdateAddress(
ips chan<- netip.Addr,
hosts chan<- string,
infos chan<- *whois.Info,
) (f func(ip netip.Addr, host string, info *whois.Info)) {
return func(ip netip.Addr, host string, info *whois.Info) {
) (f func(ctx context.Context, ip netip.Addr, host string, info *whois.Info)) {
return func(ctx context.Context, ip netip.Addr, host string, info *whois.Info) {
if !want && (host != "" || info != nil) {
panic(fmt.Errorf("got unexpected update for %v with %q and %v", ip, host, info))
}
Expand Down Expand Up @@ -230,7 +232,8 @@ func TestDefaultAddrProc_Process_WHOIS(t *testing.T) {
})
testutil.CleanupAndRequireSuccess(t, p.Close)

p.Process(testIP)
ctx := testutil.ContextWithTimeout(t, testTimeout)
p.Process(ctx, testIP)

if !tc.wantUpd {
return
Expand All @@ -251,7 +254,9 @@ func TestDefaultAddrProc_Process_WHOIS(t *testing.T) {
func TestDefaultAddrProc_Close(t *testing.T) {
t.Parallel()

p := client.NewDefaultAddrProc(&client.DefaultAddrProcConfig{})
p := client.NewDefaultAddrProc(&client.DefaultAddrProcConfig{
BaseLogger: slogutil.NewDiscardLogger(),
})

err := p.Close()
assert.NoError(t, err)
Expand Down
8 changes: 5 additions & 3 deletions internal/client/persistent.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package client

import (
"context"
"encoding"
"fmt"
"log/slog"
"net"
"net/netip"
"slices"
Expand All @@ -12,7 +14,7 @@ import (
"github.com/AdguardTeam/dnsproxy/proxy"
"github.com/AdguardTeam/dnsproxy/upstream"
"github.com/AdguardTeam/golibs/errors"
"github.com/AdguardTeam/golibs/log"
"github.com/AdguardTeam/golibs/logutil/slogutil"
"github.com/AdguardTeam/golibs/netutil"
"github.com/google/uuid"
)
Expand Down Expand Up @@ -134,7 +136,7 @@ type Persistent struct {

// validate returns an error if persistent client information contains errors.
// allTags must be sorted.
func (c *Persistent) validate(allTags []string) (err error) {
func (c *Persistent) validate(ctx context.Context, l *slog.Logger, allTags []string) (err error) {
switch {
case c.Name == "":
return errors.Error("empty name")
Expand All @@ -151,7 +153,7 @@ func (c *Persistent) validate(allTags []string) (err error) {

err = conf.Close()
if err != nil {
log.Error("client: closing upstream config: %s", err)
l.ErrorContext(ctx, "client: closing upstream config", slogutil.KeyError, err)
}

for _, t := range c.Tags {
Expand Down
Loading

0 comments on commit e529d29

Please sign in to comment.