Skip to content

Commit

Permalink
cool#9833: Mitigate connection count limitation
Browse files Browse the repository at this point in the history
- ProtocolHandlerInterface::checkTimeout(..)
  is now called via StreamSocket::checkRemoval()
  to ensure tests even w/o input data (was: handlePoll())

- ProtocolHandlerInterface::checkTimeout(..)
  - Add bool return value: true -> shutdown connection, caller shall stop processing
  - Implemented for http::Session
    - Timeout (30s) net::Config::HTTPTimeout with missing response
  - Implemented for WebSocketHandler
    - Timeout (2s = net::Config::WSPingTimeout)
      after missing WS native frame ping/pong (server only)

- StreamSocket::checkRemoval(..)
  - called directly from SocketPoll::poll()
  - only for IPv4/v6 network connections
  - similar to ProtocolHandlerInterface::checkTimeout(..)
  - calls ProtocolHandlerInterface::checkTimeout(..)
  - added further criteria (age, throughput, ..)
    - Timeout (64s = net::Config::SocketPollTimeout)
      if (now - lastSeen) > timeout
    - Timeout (12 hours) net::Config::MaxDuration
      if (now - creationTime) > timeout
    - TODO: Throughput/bandwitdh disabled, find proper metrics/timing
    - TODO: Add maximimal IPv4/IPv6 socket-count criteria, drop oldest.

- SocketPoll::poll()
  - Additionally erases if !socket->isOpen() || socket->checkRemoval()

Signed-off-by: Sven Göthel <[email protected]>
Change-Id: I7e1a9329e0848c40a210f6250e29e26950da6fbc
  • Loading branch information
Sven Göthel committed Sep 9, 2024
1 parent 63c4ba8 commit 661dcde
Show file tree
Hide file tree
Showing 10 changed files with 904 additions and 44 deletions.
23 changes: 17 additions & 6 deletions net/HttpRequest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1432,7 +1432,10 @@ class Session final : public ProtocolHandlerInterface
while (!_response->done())
{
const auto now = std::chrono::steady_clock::now();
checkTimeout(now);
if (checkTimeout(now))
{
return false;
}

const auto remaining =
std::chrono::duration_cast<std::chrono::microseconds>(deadline - now);
Expand Down Expand Up @@ -1692,17 +1695,23 @@ class Session final : public ProtocolHandlerInterface
net::asyncConnect(_host, _port, isSecure(), shared_from_this(), pushConnectCompleteToPoll);
}

void checkTimeout(std::chrono::steady_clock::time_point now) override
bool checkTimeout(std::chrono::steady_clock::time_point now) override
{
if (!_response || _response->done())
return;
{
return false;
}

const std::chrono::microseconds timeout = getTimeout();
const auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(now - _startTime);
if (now < _startTime || duration > getTimeout() || SigUtil::getTerminationFlag())

if (now < _startTime ||
(timeout > std::chrono::microseconds::zero() && duration > timeout) ||
SigUtil::getTerminationFlag())
{
LOG_WRN("Timed out while requesting [" << _request.getVerb() << ' ' << _host
<< _request.getUrl() << "] after " << duration);
LOG_WRN("CheckTimeout: Timeout while requesting [" << _request.getVerb() << ' ' << _host
<< _request.getUrl() << "] after " << duration);

// Flag that we timed out.
_response->timeout();
Expand All @@ -1712,7 +1721,9 @@ class Session final : public ProtocolHandlerInterface
// no good maintaining a poor connection (if that's the issue).
onDisconnect(); // Trigger manually (why wait for poll to do it?).
assert(isConnected() == false);
return true;
}
return false;
}

int sendTextMessage(const char*, const size_t, bool) const override { return 0; }
Expand Down
67 changes: 66 additions & 1 deletion net/Socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,17 @@ int SocketPoll::poll(int64_t timeoutMaxMicroS)
<< " of " << _pollSockets.size() << ") from " << _name);
_pollSockets[i] = nullptr;
}
else if( _pollSockets[i]->checkRemoval(newNow) )
{
// timed out socket .. also checks
// ProtocolHandlerInterface
// - http::Session::checkTimeout() OK
// - WebSocketHandler::checkTimeout() OK
++itemsErased;
LOGA_TRC(Socket, '#' << _pollFds[i].fd << ": Removing socket (at " << i
<< " of " << _pollSockets.size() << ") from " << _name);
_pollSockets[i] = nullptr;
}
else if (_pollFds[i].fd == _pollSockets[i]->getFD())
{
SocketDisposition disposition(_pollSockets[i]);
Expand Down Expand Up @@ -995,7 +1006,8 @@ void WebSocketHandler::dumpState(std::ostream& os, const std::string& /*indent*/
{
os << (_shuttingDown ? "shutd " : "alive ");
#if !MOBILEAPP
os << std::setw(5) << _pingTimeUs/1000. << "ms ";
os << std::setw(5) << _pingMicroS.last()/1000. << "ms, avg "
<< _pingMicroS.average()/1000. << "ms ";
#endif
if (_wsPayload.size() > 0)
Util::dumpHex(os, _wsPayload, "\t\tws queued payload:\n", "\t\t");
Expand Down Expand Up @@ -1404,6 +1416,59 @@ std::string StreamSocket::toString() const noexcept
return s;
}

bool StreamSocket::checkRemoval(std::chrono::steady_clock::time_point now) noexcept
{
if (!isIPType()) // forced removal on IPv[46] network connections only
{
return false;
}
const auto durTotal =
std::chrono::duration_cast<std::chrono::milliseconds>(now - getCreationTime());
const auto durLast =
std::chrono::duration_cast<std::chrono::milliseconds>(now - getLastSeenTime());
const double bytesPerSecIn = durTotal.count() > 0 ? (double)getBytesRcvd() / ((double)durTotal.count() / 1000.0) : (double)getBytesRcvd();
const bool c1 = now < getCreationTime();
const bool c2 = (_maxDuration > std::chrono::microseconds::zero() && durTotal > _maxDuration);
const bool c3 = (_pollTimeout > std::chrono::microseconds::zero() && durLast > _pollTimeout);
const bool c4 = (bytesPerSecIn > std::numeric_limits<double>::epsilon() &&
_minBytesPerSec > 1.0f && bytesPerSecIn < _minBytesPerSec);
const bool c5 = SigUtil::getTerminationFlag();
if (c1 || c2 || c3 || c4 || c5)
{
LOG_WRN("CheckRemoval: Timeout: {c1 " << c1 << ", c2 " << c2 << ", c3 " << c3 << ", c4 " << c4 << ", c5 " << c5 << "}, "
<< getStatsString(now) << ", "
<< toString());
if (_socketHandler)
{
_socketHandler->onDisconnect();
if( isOpen() ) {
// FIXME: Ensure proper semantics of onDisconnect()

Check notice

Code scanning / CodeQL

FIXME comment Note

FIXME comment: Ensure proper semantics of onDisconnect()
LOG_WRN("Socket still open post onDisconnect(), forced shutdown.");
shutdown(); // signal
closeConnection(); // real -> setClosed()
assert(isOpen() == false); // should have issued shutdown
}
}
else
{
shutdown(); // signal
closeConnection(); // real -> setClosed()
assert(isOpen() == false); // should have issued shutdown
}
return true;
}
else if (_socketHandler && _socketHandler->checkTimeout(now))
{
assert(isOpen() == false); // should have issued shutdown
setClosed();
LOG_WRN("CheckRemoval: Timeout: " << getStatsString(now) << ", " << toString());
return true;
} else {
LOG_DBG("CheckRemoval: Test " << getStatsString(now) << ", " << toString());
}
return false;
}

#if !MOBILEAPP

bool StreamSocket::parseHeader(const char *clientName,
Expand Down
9 changes: 5 additions & 4 deletions net/Socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,9 @@ class ProtocolHandlerInterface :
virtual int getPollEvents(std::chrono::steady_clock::time_point now,
int64_t &timeoutMaxMicroS) = 0;

/// Do we need to handle a timeout ?
virtual void checkTimeout(std::chrono::steady_clock::time_point /* now */) {}
/// Checks whether a timeout has occurred. Method will shutdown connection and socket on timeout.
/// Returns true in case of a timeout, caller shall stop processing
virtual bool checkTimeout(std::chrono::steady_clock::time_point /* now */) { return false; }

/// Do some of the queued writing.
virtual void performWrites(std::size_t capacity) = 0;
Expand Down Expand Up @@ -1083,6 +1084,8 @@ class StreamSocket : public Socket,

std::string toString() const noexcept override;

bool checkRemoval(std::chrono::steady_clock::time_point now) noexcept override;

/// Just trigger the async shutdown.
void shutdown() override
{
Expand Down Expand Up @@ -1412,8 +1415,6 @@ class StreamSocket : public Socket,
{
ASSERT_CORRECT_SOCKET_THREAD(this);

_socketHandler->checkTimeout(now);

if (!events && _inBuffer.empty())
return;

Expand Down
112 changes: 88 additions & 24 deletions net/WebSocketHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@

#pragma once

#include "NetUtil.hpp"
#include "Socket.hpp"
#include "common/Common.hpp"
#include "common/Log.hpp"
#include "common/Protocol.hpp"
#include "common/Unit.hpp"
#include "common/Util.hpp"
#include <limits>
#include <net/HttpRequest.hpp>

#include <Poco/Net/HTTPResponse.h>
Expand All @@ -33,8 +35,11 @@ class WebSocketHandler : public ProtocolHandlerInterface
std::weak_ptr<StreamSocket> _socket;

#if !MOBILEAPP
std::chrono::microseconds _pingPeriod;
std::chrono::duration<double, std::micro> _pingTimeout;
std::chrono::steady_clock::time_point _lastPingSentTime;
int _pingTimeUs;
size_t _openPingCount;
Util::TimeAverage _pingMicroS;
bool _isMasking;
bool _inFragmentBlock;
/// The security key. Meaningful only for clients.
Expand All @@ -60,7 +65,6 @@ class WebSocketHandler : public ProtocolHandlerInterface
};

static constexpr std::chrono::microseconds InitialPingDelayMicroS = std::chrono::milliseconds(25);
static constexpr std::chrono::microseconds PingFrequencyMicroS = std::chrono::seconds(18);

public:
/// Perform upgrade ourselves, or select a client web socket.
Expand All @@ -71,10 +75,12 @@ class WebSocketHandler : public ProtocolHandlerInterface
WebSocketHandler(bool isClient, [[maybe_unused]] bool isMasking)
:
#if !MOBILEAPP
_pingPeriod(net::Config::get().WSPingPeriod),
_pingTimeout(net::Config::get().WSPingTimeout),
_lastPingSentTime(std::chrono::steady_clock::now() -
std::chrono::microseconds(PingFrequencyMicroS) +
_pingPeriod +
std::chrono::microseconds(InitialPingDelayMicroS))
, _pingTimeUs(0)
, _openPingCount(0)
, _isMasking(isClient && isMasking)
, _inFragmentBlock(false)
, _key(isClient ? generateKey() : std::string())
Expand Down Expand Up @@ -410,10 +416,18 @@ class WebSocketHandler : public ProtocolHandlerInterface
if (_isClient)
LOG_WRN("Servers should not send pongs, only clients");

_pingTimeUs = std::chrono::duration_cast<std::chrono::microseconds>
(std::chrono::steady_clock::now() - _lastPingSentTime).count();
LOGA_TRC(WebSocket, "Pong received: " << _pingTimeUs << " microseconds");
gotPing(code, _pingTimeUs);
const auto now = std::chrono::steady_clock::now();
const int64_t us = std::chrono::duration_cast<std::chrono::microseconds>
(now - _lastPingSentTime).count();
const double avg_us = _pingMicroS.add(_lastPingSentTime, static_cast<double>(us));
if (_openPingCount > 0)
{
--_openPingCount;
}
LOGA_TRC(WebSocket, "Pong received: " << us << "us, avg " << avg_us << "us over "
<< (int)_pingMicroS.duration() << "s, open pings left "
<< _openPingCount);
gotPing(code, static_cast<int>(us));
}
break;
case WSOpCode::Ping:
Expand All @@ -422,10 +436,12 @@ class WebSocketHandler : public ProtocolHandlerInterface
LOG_DBG("Clients should not send pings, only servers");

const auto now = std::chrono::steady_clock::now();
_pingTimeUs = std::chrono::duration_cast<std::chrono::microseconds>
(now - _lastPingSentTime).count();
const int64_t us = std::chrono::duration_cast<std::chrono::microseconds>
(now - _lastPingSentTime).count();
const double avg_us = _pingMicroS.add(_lastPingSentTime, static_cast<double>(us));
sendPong(now, &ctrlPayload[0], payloadLen, socket);
gotPing(code, _pingTimeUs);
LOGA_TRC(WebSocket, "Ping received: " << us << " us -> avg " << avg_us << " us");
gotPing(code, static_cast<int>(us));
}
break;
case WSOpCode::Close:
Expand Down Expand Up @@ -593,7 +609,7 @@ class WebSocketHandler : public ProtocolHandlerInterface
const auto timeSincePingMicroS
= std::chrono::duration_cast<std::chrono::microseconds>(now - _lastPingSentTime);
timeoutMaxMicroS
= std::min(timeoutMaxMicroS, (int64_t)(PingFrequencyMicroS - timeSincePingMicroS).count());
= std::min(timeoutMaxMicroS, (int64_t)(_pingPeriod - timeSincePingMicroS).count());
}
#endif
int events = POLLIN;
Expand All @@ -604,7 +620,14 @@ class WebSocketHandler : public ProtocolHandlerInterface

#if !MOBILEAPP
private:
/// Send a ping message
/// Sets last ping sent and received time, avoiding a timout
void setPingPongTime(std::chrono::steady_clock::time_point now) noexcept
{
_lastPingSentTime = now;
_pingMicroS.moveTo(now);
}

/// Sends a native control-frame ping or pong message
void sendPingOrPong(std::chrono::steady_clock::time_point now,
const char* data, const size_t len,
const WSOpCode code,
Expand All @@ -616,48 +639,89 @@ class WebSocketHandler : public ProtocolHandlerInterface
if (!socket->isWebSocket())
{
LOG_WRN("Attempted ping on non-upgraded websocket! #" << socket->getFD());
_lastPingSentTime = now; // Pretend we sent it to avoid timing out immediately.
setPingPongTime(now); // Pretend we sent it to avoid timing out immediately.
return;
}

LOGA_TRC(WebSocket, "Sending " << (const char*)(code == WSOpCode::Ping ? " ping" : "pong"));
if (code == WSOpCode::Ping)
{
++_openPingCount;
LOGA_TRC(WebSocket, "Sending ping, open pings left " << _openPingCount);
}
else
{
LOGA_TRC(WebSocket, "Sending pong");
}
_lastPingSentTime = now;
// FIXME: allow an empty payload.
sendMessage(data, len, code, false);
_lastPingSentTime = now;
}

public:
/// Sends a native control-frame ping message
void sendPing(std::chrono::steady_clock::time_point now,
const std::shared_ptr<StreamSocket>& socket)
{
// assert(!_isClient);
if (_isClient)
LOG_DBG("Clients should not send pings, only servers");
// assert(_isClient);

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
sendPingOrPong(now, "", 1, WSOpCode::Ping, socket);
}
#if 0
/// Sends a native control-frame ping message
/// Only for debugging purposes. NOT WORKING (SOCKET THREAD AFFINITY)
void sendPing(std::chrono::steady_clock::time_point now)
{
std::shared_ptr<StreamSocket> socket = getSocket().lock();
if (socket) {
sendPing(now, socket);
} else {
LOG_DBG("sendPing: Socket n/a yet");
}
}
#endif

/// Sends a native control-frame pong message
void sendPong(std::chrono::steady_clock::time_point now,
const char* data, const size_t len,
const std::shared_ptr<StreamSocket>& socket)
{
if (!_isClient)
LOG_WRN("Servers should not send pongs, only clients");
// assert(!_isClient);

Check notice

Code scanning / CodeQL

Commented-out code Note

This comment appears to contain commented-out code.
sendPingOrPong(now, data, len, WSOpCode::Pong, socket);
}
#endif

/// Do we need to handle a timeout ?
void checkTimeout([[maybe_unused]] std::chrono::steady_clock::time_point now) override
bool checkTimeout([[maybe_unused]] std::chrono::steady_clock::time_point now) override
{
#if !MOBILEAPP
if (_isClient)
return;
{
return false;
}

const auto timeSincePingMicroS
= std::chrono::duration_cast<std::chrono::microseconds>(now - _lastPingSentTime);
if (timeSincePingMicroS >= PingFrequencyMicroS)
if (_openPingCount > 0 && _pingTimeout.count() > std::numeric_limits<double>::epsilon() &&
_pingMicroS.average() >= _pingTimeout.count())
{
LOG_WRN("CheckTimeout: Timeout websocket: Ping: count "
<< _openPingCount << ", last " << _pingMicroS.last() << "us, avg "
<< _pingMicroS.average() << "us >= " << _pingTimeout.count() << "us over "
<< (int)_pingMicroS.duration() << "s");
shutdownSilent();
return true;
}
if (!_pingMicroS.initialized() || (_pingPeriod > std::chrono::microseconds::zero() &&
now - _pingMicroS.lastTime() >= _pingPeriod))
{
const std::shared_ptr<StreamSocket> socket = _socket.lock();
if (socket)
{
sendPing(now, socket);
}
}
#endif
return false;
}

public:
Expand Down Expand Up @@ -1070,7 +1134,7 @@ class WebSocketHandler : public ProtocolHandlerInterface
#if !MOBILEAPP
// No need to ping right upon connection/upgrade,
// but do reset the time to avoid pinging immediately after.
_lastPingSentTime = std::chrono::steady_clock::now();
setPingPongTime(std::chrono::steady_clock::now());
#endif
}

Expand Down
Loading

0 comments on commit 661dcde

Please sign in to comment.