diff --git a/sonic/pool_test.go b/sonic/pool_test.go index 0a01daf..9a48cf7 100644 --- a/sonic/pool_test.go +++ b/sonic/pool_test.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "net" - "sync" "testing" "time" @@ -14,7 +13,6 @@ import ( func TestPool_Reconnect(t *testing.T) { host, port, pass := getSonicConfig(t) - closeAllCond := sync.NewCond(&sync.Mutex{}) proxyLn, proxyDoneCh := runTCPProxy(t, fmt.Sprintf("%s:%d", host, port), // Target addr. "127.0.0.1:0", // Proxy addr. @@ -22,7 +20,12 @@ func TestPool_Reconnect(t *testing.T) { proxyHost, proxyPort := mustSplitHostPort(t, proxyLn.Addr().String()) - ing, err := sonic.NewIngester(proxyHost, proxyPort, pass) + ing, err := sonic.NewIngester( + proxyHost, + proxyPort, + pass, + sonic.OptionPoolPingThreshold(time.Nanosecond), + ) if err != nil { t.Fatal("NewIngester", err) } @@ -34,10 +37,6 @@ func TestPool_Reconnect(t *testing.T) { t.Fatal("Ping", err) } - closeAllCond.L.Lock() - closeAllCond.Broadcast() - closeAllCond.L.Unlock() - err = ing.Ping() if err != nil { t.Fatal("Ping", err) @@ -85,6 +84,80 @@ func TestPool_Reconnect(t *testing.T) { } } +func TestPool_Reconnect_Threshold(t *testing.T) { + host, port, pass := getSonicConfig(t) + + proxyLn, proxyDoneCh := runTCPProxy(t, + fmt.Sprintf("%s:%d", host, port), // Target addr. + "127.0.0.1:0", // Proxy addr. + ) + + proxyHost, proxyPort := mustSplitHostPort(t, proxyLn.Addr().String()) + + ing, err := sonic.NewIngester( + proxyHost, + proxyPort, + pass, + sonic.OptionPoolPingThreshold(time.Minute), + ) + if err != nil { + t.Fatal("NewIngester", err) + } + + // Connection healthy, ping should work. + + err = ing.Ping() + if err != nil { + t.Fatal("Ping", err) + } + + err = ing.Ping() + if err != nil { + t.Fatal("Ping", err) + } + + // Close connection, ping should not work. + + err = proxyLn.Close() + if err != nil { + t.Fatal("Close", err) + } + + select { + case <-proxyDoneCh: + case <-time.After(2 * time.Second): + t.Fatal("Timeout") + } + + err = ing.Ping() + if err == nil { + t.Fatal("Ping", err) + } + + // Reconnect in threshold, ping still should not work. + + proxyLn, proxyDoneCh = runTCPProxy(t, + fmt.Sprintf("%s:%d", host, port), // Target addr. + fmt.Sprintf("%s:%d", proxyHost, proxyPort), // Proxy addr. + ) + + err = ing.Ping() + if err == nil { + t.Fatal("Ping", err) + } + + err = proxyLn.Close() + if err != nil { + t.Fatal("Close", err) + } + + select { + case <-proxyDoneCh: + case <-time.After(2 * time.Second): + t.Fatal("Timeout") + } +} + func runTCPProxy( tb testing.TB, targetAddr string,