Skip to content

Commit

Permalink
Fix reconnect on ping
Browse files Browse the repository at this point in the history
  • Loading branch information
Kryvchun committed Jun 29, 2022
1 parent 13bf620 commit 2d7bb13
Showing 1 changed file with 80 additions and 7 deletions.
87 changes: 80 additions & 7 deletions sonic/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"net"
"sync"
"testing"
"time"

Expand All @@ -14,15 +13,19 @@ import (
func TestPool_Reconnect(t *testing.T) {
host, port, pass := getSonicConfig(t)

closeAllCond := sync.NewCond(&sync.Mutex{})
proxyLn, proxyDoneCh := runTCPProxy(t,
fmt.Sprintf("%s:%d", host, port), // Target addr.
"127.0.0.1:0", // Proxy addr.
)

proxyHost, proxyPort := mustSplitHostPort(t, proxyLn.Addr().String())

ing, err := sonic.NewIngester(proxyHost, proxyPort, pass)
ing, err := sonic.NewIngester(
proxyHost,
proxyPort,
pass,
sonic.OptionPoolPingThreshold(time.Nanosecond),
)
if err != nil {
t.Fatal("NewIngester", err)
}
Expand All @@ -34,10 +37,6 @@ func TestPool_Reconnect(t *testing.T) {
t.Fatal("Ping", err)
}

closeAllCond.L.Lock()
closeAllCond.Broadcast()
closeAllCond.L.Unlock()

err = ing.Ping()
if err != nil {
t.Fatal("Ping", err)
Expand Down Expand Up @@ -85,6 +84,80 @@ func TestPool_Reconnect(t *testing.T) {
}
}

func TestPool_Reconnect_Threshold(t *testing.T) {
host, port, pass := getSonicConfig(t)

proxyLn, proxyDoneCh := runTCPProxy(t,
fmt.Sprintf("%s:%d", host, port), // Target addr.
"127.0.0.1:0", // Proxy addr.
)

proxyHost, proxyPort := mustSplitHostPort(t, proxyLn.Addr().String())

ing, err := sonic.NewIngester(
proxyHost,
proxyPort,
pass,
sonic.OptionPoolPingThreshold(time.Minute),
)
if err != nil {
t.Fatal("NewIngester", err)
}

// Connection healthy, ping should work.

err = ing.Ping()
if err != nil {
t.Fatal("Ping", err)
}

err = ing.Ping()
if err != nil {
t.Fatal("Ping", err)
}

// Close connection, ping should not work.

err = proxyLn.Close()
if err != nil {
t.Fatal("Close", err)
}

select {
case <-proxyDoneCh:
case <-time.After(2 * time.Second):
t.Fatal("Timeout")
}

err = ing.Ping()
if err == nil {
t.Fatal("Ping", err)
}

// Reconnect in threshold, ping still should not work.

proxyLn, proxyDoneCh = runTCPProxy(t,
fmt.Sprintf("%s:%d", host, port), // Target addr.
fmt.Sprintf("%s:%d", proxyHost, proxyPort), // Proxy addr.
)

err = ing.Ping()
if err == nil {
t.Fatal("Ping", err)
}

err = proxyLn.Close()
if err != nil {
t.Fatal("Close", err)
}

select {
case <-proxyDoneCh:
case <-time.After(2 * time.Second):
t.Fatal("Timeout")
}
}

func runTCPProxy(
tb testing.TB,
targetAddr string,
Expand Down

0 comments on commit 2d7bb13

Please sign in to comment.