Skip to content

Commit

Permalink
prevent port number reuse with TTL-based caching (#4689)
Browse files Browse the repository at this point in the history
## Problem
In devstack and tests, we start multiple services (API, NATS,
publishers) that each need unique ports. Since we allocate all ports
before starting the services, calling GetFreePort multiple times in
quick succession could return the same port number, leading to service
startup failures.

## Solution
Added TTL-based port caching (5s default) to prevent GetFreePort from
returning recently allocated ports. This gives devstack time to actually
bind the ports before they can be reused.

The change is backwards compatible - existing GetFreePort calls
automatically get the new caching behavior.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced a new port allocation system with time-based reservations
for enhanced management of network ports.
- Added functionality to check if a port is open and ensure thread
safety during concurrent allocations.

- **Bug Fixes**
	- Improved the reliability of port allocation and reservation checks.

- **Tests**
- Expanded test suite to include new tests for eviction and reservation
mechanisms, maximum attempts, concurrent allocations, and availability
checks, enhancing overall test coverage.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
  • Loading branch information
wdbaruni and coderabbitai[bot] authored Nov 5, 2024
1 parent de5a3ef commit c5a922c
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 53 deletions.
117 changes: 85 additions & 32 deletions pkg/lib/network/ports.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,93 @@
package network

import (
"fmt"
"net"
"strconv"
"sync"
"time"
)

// IsPortOpen checks if a port is open by attempting to listen on it. If the
// port is open, it returns true, otherwise it returns false. The port listen
// socket will be closed if the function was able to create it.
func IsPortOpen(port int) bool {
addr := net.JoinHostPort("", strconv.Itoa(port))
ln, err := net.Listen("tcp", addr)
if err != nil {
// There was a problem listening, the port is probably in use
return false
const (
defaultPortAllocatorTTL = 5 * time.Second
defaultMaxAttempts = 10
)

// PortAllocator manages thread-safe allocation of network ports with a time-based reservation system.
// Once a port is allocated, it won't be reallocated until after the TTL expires, helping prevent
// race conditions in concurrent port allocation scenarios.
type PortAllocator struct {
mu sync.Mutex
reservedPorts map[int]time.Time
ttl time.Duration
maxAttempts uint
}

var (
// globalAllocator is the package-level port allocator instance used by GetFreePort
globalAllocator = NewPortAllocator(defaultPortAllocatorTTL, defaultMaxAttempts)
)

// NewPortAllocator creates a new PortAllocator instance.
// ttl determines how long a port remains reserved after allocation.
// maxAttempts limits how many times we'll try to find an unreserved port before giving up.
func NewPortAllocator(ttl time.Duration, maxAttempts uint) *PortAllocator {
if maxAttempts == 0 {
maxAttempts = defaultMaxAttempts
}
return &PortAllocator{
reservedPorts: make(map[int]time.Time),
ttl: ttl,
maxAttempts: maxAttempts,
}
}

// We were able to use the port, so it is free, but we should close it
// first
_ = ln.Close()
return true
// GetFreePort returns an available port using the global port allocator.
// The returned port is guaranteed to not be reallocated by this package
// for the duration of the TTL (default 5 seconds).
func GetFreePort() (int, error) {
return globalAllocator.GetFreePort()
}

// GetFreePort returns an available port and reserves it for the duration of the TTL.
// If a port is already reserved but its TTL has expired, it may be returned if it's
// still available on the system. Returns error if unable to find an available port
// after maxAttempts tries.
func (pa *PortAllocator) GetFreePort() (int, error) {
pa.mu.Lock()
defer pa.mu.Unlock()

// Clean up expired reservations eagerly to prevent memory growth
now := time.Now()
for port, expiration := range pa.reservedPorts {
if now.After(expiration) {
delete(pa.reservedPorts, port)
}
}

for attempts := uint(0); attempts < pa.maxAttempts; attempts++ {
port, err := getFreePortFromSystem()
if err != nil {
return 0, fmt.Errorf("failed to get port from system: %w", err)
}

if _, reserved := pa.reservedPorts[port]; !reserved {
pa.reservedPorts[port] = now.Add(pa.ttl)
return port, nil
}
}

return 0, fmt.Errorf("failed to find an available port after %d attempts", pa.maxAttempts)
}

// GetFreePort returns a single available port by asking the operating
// system to pick one for us. Luckily ports are not re-used so after asking
// for a port number, we attempt to create a tcp listener.
// getFreePortFromSystem asks the operating system for an available port by creating
// a TCP listener with port 0, which triggers the OS to assign a random available port.
//
// Essentially the same code as https://github.com/phayes/freeport but we bind
// to 0.0.0.0 to ensure the port is free on all interfaces, and not just localhost.GetFreePort
// Ports must be unique for an address, not an entire system and so checking just localhost
// is not enough.
func GetFreePort() (int, error) {
func getFreePortFromSystem() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", ":0")
if err != nil {
return 0, err
Expand All @@ -44,20 +101,16 @@ func GetFreePort() (int, error) {
return l.Addr().(*net.TCPAddr).Port, nil
}

// GetFreePorts returns an array available ports by asking the operating
// system to pick one for us.
//
// Essentially the same code as https://github.com/phayes/freeport apart from
// the caveats described in GetFreePort.
func GetFreePorts(count int) ([]int, error) {
ports := []int{}

for i := 0; i < count; i++ {
port, err := GetFreePort()
if err != nil {
return nil, err
}
ports = append(ports, port)
// IsPortOpen checks if a specific port is available for use by attempting to create
// a TCP listener on that port. It returns true if the port is available, false otherwise.
// The caller should note that the port's availability may change immediately after
// this check returns.
func IsPortOpen(port int) bool {
addr := net.JoinHostPort("", strconv.Itoa(port))
ln, err := net.Listen("tcp", addr)
if err != nil {
return false
}
return ports, nil
ln.Close()
return true
}
128 changes: 107 additions & 21 deletions pkg/lib/network/ports_test.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,133 @@
//go:build unit || !integration

package network_test
package network

import (
"net"
"strconv"
"sync"
"testing"
"time"

"github.com/bacalhau-project/bacalhau/pkg/lib/network"
"github.com/stretchr/testify/suite"
)

type FreePortTestSuite struct {
type PortAllocatorTestSuite struct {
suite.Suite
}

func TestFreePortTestSuite(t *testing.T) {
suite.Run(t, new(FreePortTestSuite))
func TestPortAllocatorTestSuite(t *testing.T) {
suite.Run(t, new(PortAllocatorTestSuite))
}

func (s *FreePortTestSuite) TestGetFreePort() {
port, err := network.GetFreePort()
// TestGetFreePort verifies that GetFreePort returns a usable port
func (s *PortAllocatorTestSuite) TestGetFreePort() {
port, err := GetFreePort()
s.Require().NoError(err)
s.NotEqual(0, port, "expected a non-zero port")

// Try to listen on the port
l, err := net.Listen("tcp", "127.0.0.1:"+strconv.Itoa(port))
// Verify we can listen on the port
l, err := net.Listen("tcp", ":"+strconv.Itoa(port))
s.Require().NoError(err)
defer l.Close()
}

func (s *FreePortTestSuite) TestGetFreePorts() {
count := 3
ports, err := network.GetFreePorts(count)
// TestEvictionAndReservation tests both the TTL eviction and reservation mechanism
func (s *PortAllocatorTestSuite) TestEvictionAndReservation() {
now := time.Now()
allocator := &PortAllocator{
reservedPorts: map[int]time.Time{
8080: now.Add(-time.Second), // expired
8081: now.Add(time.Second), // not expired
8082: now.Add(-time.Second), // expired
},
ttl: time.Second,
maxAttempts: 10,
}

// Getting a free port should clean up expired entries
port, err := allocator.GetFreePort()
s.Require().NoError(err)

// Verify expired ports were cleaned up
s.Len(allocator.reservedPorts, 2) // port we just got plus 8081
_, hasPort := allocator.reservedPorts[8081]
s.True(hasPort, "non-expired port should still be present")

// New port should be reserved
_, hasNewPort := allocator.reservedPorts[port]
s.True(hasNewPort, "new port should be reserved")
}

// TestMaxAttempts verifies the retry limit when ports are reserved
func (s *PortAllocatorTestSuite) TestMaxAttempts() {
allocator := &PortAllocator{
reservedPorts: make(map[int]time.Time),
ttl: time.Second,
maxAttempts: 3,
}

// Reserve all possible user ports (1024-65535) to force GetFreePort to fail
// System ports (1-1023) are not used as they typically require elevated privileges
for i := 1024; i <= 65535; i++ {
allocator.reservedPorts[i] = time.Now().Add(time.Minute)
}

// Should fail after maxAttempts since all ports are reserved
_, err := allocator.GetFreePort()
s.Require().Error(err)
s.Contains(err.Error(), "failed to find an available port after 3 attempts")
}

// TestConcurrentPortAllocation verifies thread-safety of port allocation
func (s *PortAllocatorTestSuite) TestConcurrentPortAllocation() {
var wg sync.WaitGroup
allocator := NewPortAllocator(time.Second, 10)
ports := make(map[int]bool)
var mu sync.Mutex

// Spawn 20 goroutines to allocate ports concurrently
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
port, err := allocator.GetFreePort()
s.Require().NoError(err)

mu.Lock()
s.False(ports[port], "port %d was allocated multiple times", port)
ports[port] = true
mu.Unlock()

// Verify we can listen on the port
l, err := net.Listen("tcp", ":"+strconv.Itoa(port))
s.Require().NoError(err)
l.Close()
}()
}
wg.Wait()
}

// TestIsPortOpen verifies the port availability check
func (s *PortAllocatorTestSuite) TestIsPortOpen() {
port, err := GetFreePort()
s.Require().NoError(err)
s.True(IsPortOpen(port), "newly allocated port should be open")

// Listen on the port
l, err := net.Listen("tcp", ":"+strconv.Itoa(port))
s.Require().NoError(err)
s.Equal(count, len(ports), "expected %d ports", count)
defer l.Close()

for _, port := range ports {
s.NotEqual(0, port, "expected a non-zero port")
// Port should now be in use
s.False(IsPortOpen(port), "port should be in use")
}

// Try to listen on the port
l, err := net.Listen("tcp", ":"+strconv.Itoa(port))
s.Require().NoError(err, "failed to listen on newly given port")
defer l.Close()
// TestGlobalAllocator verifies the global allocator behavior
func (s *PortAllocatorTestSuite) TestGlobalAllocator() {
usedPorts := make(map[int]bool)
for i := 0; i < 3; i++ {
port, err := GetFreePort()
s.Require().NoError(err)
s.False(usedPorts[port], "global allocator reused port %d", port)
usedPorts[port] = true
}
}

0 comments on commit c5a922c

Please sign in to comment.