Skip to content

Commit

Permalink
cool#9833: Mitigate connection count limitation, fix timeout-checking…
Browse files Browse the repository at this point in the history
… code (*WIP*)

This commit is a WIP, pushed for discussion and to be amended.
(Cleaned up version with general code cleanups and Util artefacts removed for clarity)

- All network limits and timeouts are configurable now, see COOLWSD
        { "net.ws.ping.timeout", "2000000" }, // WebSocketHandler ping timeout in us (2s). Zero disables metric.
        { "net.ws.ping.period", "3000000" }, // WebSocketHandler ping period in us (3s), i.e. duration until next ping.
        { "net.http.timeout", "30000000" }, // http::Session timeout in us (30s). Zero disables metric.
        { "net.maxconnections", "100000" }, // Socket maximum connections (100000). Zero disables metric.
        { "net.maxduration", "43200" }, // Socket maximum duration in seconds (12h). Zero disables metric.
        { "net.minbps", "0" }, // Socket minimum bits per seconds throughput (0). Increase for debugging. Zero disables metric.
        { "net.socketpoll.timeout", "64000000" }, // SocketPoll timeout in us (64s).
  and net::Defaults in NetUtil.hpp

- ProtocolHandlerInterface::checkTimeout(..)
  is now called via StreamSocket::checkRemoval()
  to ensure tests even w/o input data (was: handlePoll())

- ProtocolHandlerInterface::checkTimeout(..)
  - Add bool return value: true -> shutdown connection, caller shall stop processing
  - Implemented for http::Session
    - Timeout (30s) net::Defaults::HTTPTimeout with missing response
  - Implemented for WebSocketHandler
    - Timeout (2s = net::Defaults::WSPingTimeout)
      after missing WS native frame ping/pong (server only)

- StreamSocket -> Socket (properties moved)
  - bytes sent/received
  - closed state

- Socket (added properties)
  - creation- and last-seen -time
  - socket type and port
  - checkRemoval(..)
    - called directly from SocketPoll::poll()
    - only for IPv4/v6 network connections
    - similar to ProtocolHandlerInterface::checkTimeout(..)
    - added further criteria (age, throughput, ..)
      - Timeout (64s = net::Defaults::SocketPollTimeout)
        if (now - lastSeen) > timeout
      - Timeout (12 hours) net::Defaults::MaxDuration
        if (now - creationTime) > timeout
      - TODO: Throughput/bandwitdh disabled, find proper metrics/timing
      - TODO: Add maximimal IPv4/IPv6 socket-count criteria, drop oldest.

- SocketPoll::poll()
  - Additionally erases if !socket->isOpen() || socket->checkRemoval()

Signed-off-by: Sven Göthel <[email protected]>
Change-Id: I7e1a9329e0848c40a210f6250e29e26950da6fbc
  • Loading branch information
Sven Göthel committed Sep 6, 2024
1 parent 9da6eb8 commit 6fc374e
Show file tree
Hide file tree
Showing 18 changed files with 1,429 additions and 171 deletions.
96 changes: 96 additions & 0 deletions common/Util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#include <cstring>
#include <algorithm>
#include <atomic>
#include <limits>
#include <mutex>
#include <ratio>
#include <set>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -148,6 +150,100 @@ namespace Util
uint64_t _startSys;
};

// Time-weighted average
class TimeAverage
{
public:
typedef std::chrono::steady_clock::time_point time_point;

private:
time_point _t0;
time_point _t1;
double _v1;
double _area;

public:
// default ctor without value initialization
constexpr TimeAverage() noexcept
: _v1(0)
, _area(0)
{
}

// ctor with value initialization
constexpr TimeAverage(time_point t, double v) noexcept
: _t0(t)
, _t1(t)
, _v1(v)
, _area(0.0)
{
}

// returns true if initialized via reset() or ctor
constexpr bool initialized() const noexcept
{
return _t0.time_since_epoch() != time_point::duration::zero();
}

// initialize instance with given values
void reset(time_point t, double v) noexcept
{
_t0 = t;
_t1 = t;
_v1 = v;
_area = 0.0;
}

// move startTime() and endTime() to given time-point maintaining duration()
void moveTo(time_point t) noexcept
{
auto dt = t - _t1;
_t0 += dt;
_t1 += dt;
}

// Adds an arbitrary value at given time-point, returns new average
// May reset() instance and returns `vn` if not yet initialized()
double add(time_point t, double v) noexcept
{
if (!initialized())
{
reset(t, v);
return v;
}
else
{
// time*value area accumulation in seconds
const double dt1 = std::chrono::duration<double>(t - _t1).count();
// area(rectangle + triangle)
// _area += dt1 * _v1 + (dt1 * (v - _v1) * 0.5);
_area += dt1 * 0.5 * (_v1 + v);
_t1 = t;
_v1 = v;
return average();
}
}

// Returns average
constexpr double average() const noexcept
{
const double dt = duration();
return dt > std::numeric_limits<double>::epsilon() ? _area / dt : _v1;
}

// Returns last value
constexpr double last() const noexcept { return _v1; }
// Returns last time-point
constexpr std::chrono::steady_clock::time_point lastTime() const noexcept { return _t1; }
// Returns first time-point
constexpr std::chrono::steady_clock::time_point startTime() const noexcept { return _t0; }
// Returns duration in seconds
constexpr double duration() const noexcept
{
return std::chrono::duration<double>(_t1 - _t0).count();
}
};

class DirectoryCounter
{
void *_tasks;
Expand Down
3 changes: 2 additions & 1 deletion kit/Kit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
#include <common/JsonUtil.hpp>
#include "KitHelper.hpp"
#include "Kit.hpp"
#include <NetUtil.hpp>
#include <Protocol.hpp>
#include <Log.hpp>
#include <Png.hpp>
Expand Down Expand Up @@ -2838,7 +2839,7 @@ void documentViewCallback(const int type, const char* payload, void* data)
int pollCallback(void* pData, int timeoutUs)
{
if (timeoutUs < 0)
timeoutUs = SocketPoll::DefaultPollTimeoutMicroS.count();
timeoutUs = net::Defaults::get().SocketPollTimeout.count();
#ifndef IOS
if (!pData)
return 0;
Expand Down
27 changes: 19 additions & 8 deletions net/HttpRequest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1199,9 +1199,9 @@ class Session final : public ProtocolHandlerInterface
}

/// Returns the default timeout.
static constexpr std::chrono::milliseconds getDefaultTimeout()
static std::chrono::milliseconds getDefaultTimeout()
{
return std::chrono::seconds(30);
return std::chrono::duration_cast<std::chrono::milliseconds>( net::Defaults::get().HTTPTimeout );
}

/// Returns the current protocol scheme.
Expand Down Expand Up @@ -1432,7 +1432,10 @@ class Session final : public ProtocolHandlerInterface
while (!_response->done())
{
const auto now = std::chrono::steady_clock::now();
checkTimeout(now);
if (checkTimeout(now))
{
return false;
}

const auto remaining =
std::chrono::duration_cast<std::chrono::microseconds>(deadline - now);
Expand Down Expand Up @@ -1692,17 +1695,23 @@ class Session final : public ProtocolHandlerInterface
net::asyncConnect(_host, _port, isSecure(), shared_from_this(), pushConnectCompleteToPoll);
}

void checkTimeout(std::chrono::steady_clock::time_point now) override
bool checkTimeout(std::chrono::steady_clock::time_point now) override
{
if (!_response || _response->done())
return;
{
return false;
}

const std::chrono::microseconds timeout = getTimeout();
const auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(now - _startTime);
if (now < _startTime || duration > getTimeout() || SigUtil::getTerminationFlag())

if (now < _startTime ||
(timeout > std::chrono::microseconds::zero() && duration > timeout) ||
SigUtil::getTerminationFlag())
{
LOG_WRN("Timed out while requesting [" << _request.getVerb() << ' ' << _host
<< _request.getUrl() << "] after " << duration);
LOG_WRN("CheckTimeout: Timeout while requesting [" << _request.getVerb() << ' ' << _host
<< _request.getUrl() << "] after " << duration);

// Flag that we timed out.
_response->timeout();
Expand All @@ -1712,7 +1721,9 @@ class Session final : public ProtocolHandlerInterface
// no good maintaining a poor connection (if that's the issue).
onDisconnect(); // Trigger manually (why wait for poll to do it?).
assert(isConnected() == false);
return true;
}
return false;
}

int sendTextMessage(const char*, const size_t, bool) const override { return 0; }
Expand Down
47 changes: 47 additions & 0 deletions net/NetUtil.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

#pragma once

#include <JailUtil.hpp>
#include <Poco/DefaultStrategy.h>
#include <atomic>
#include <chrono>
#include <functional>
#include <string>
#include <memory>
Expand All @@ -28,6 +32,49 @@ struct sockaddr;
namespace net
{

class Defaults
{
public:
/// WebSocketHandler ping timeout in us (2s default). Zero disables metric.
std::chrono::microseconds WSPingTimeout;
/// WebSocketHandler ping period in us (3s default), i.e. duration until next ping. Zero disables metric.
std::chrono::microseconds WSPingPeriod;
/// http::Session timeout in us (30s default). Zero disables metric.
std::chrono::microseconds HTTPTimeout;

/// Socket maximum connections (100000). Zero disables metric.
size_t MaxConnectionCount;
/// Socket maximum duration in seconds (12h). Zero disables metric.
std::chrono::seconds MaxDuration;
/// Socket minimum bits per seconds throughput (0). Zero disables metric.
double MinBytesPerSec;

/// Socket poll timeout in us (64s), useful to increase for debugging.
std::chrono::microseconds SocketPollTimeout;

private:
Defaults()
: WSPingTimeout(std::chrono::milliseconds(2000))
, WSPingPeriod(std::chrono::milliseconds(3000))
, HTTPTimeout(std::chrono::milliseconds(30000))
, MaxConnectionCount(100000)
, MaxDuration(std::chrono::seconds(43200))
, MinBytesPerSec(0.0)
, SocketPollTimeout(std::chrono::seconds(64))
{
}

public:
Defaults(const Defaults&) = delete;
Defaults(Defaults&&) = delete;

static Defaults& get()
{
static Defaults def;
return def;
}
};

#if !MOBILEAPP

class HostEntry
Expand Down
2 changes: 1 addition & 1 deletion net/ServerSocket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class LocalServerSocket : public ServerSocket
ServerSocket(Socket::Type::Unix, clientPoller, std::move(sockFactory))
{
}
~LocalServerSocket();
~LocalServerSocket() override;

bool bind(Type, int) override { assert(false); return false; }
std::shared_ptr<Socket> accept() override;
Expand Down
Loading

0 comments on commit 6fc374e

Please sign in to comment.