Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle limited open Connections due to keepalive connections #9916

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
96 changes: 96 additions & 0 deletions common/Util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#include <cstring>
#include <algorithm>
#include <atomic>
#include <limits>
#include <mutex>
#include <ratio>
#include <set>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -148,6 +150,100 @@ namespace Util
uint64_t _startSys;
};

// Time-weighted average
class TimeAverage
{
public:
typedef std::chrono::steady_clock::time_point time_point;

private:
time_point _t0;
time_point _t1;
double _v1;
double _area;

public:
// default ctor without value initialization
constexpr TimeAverage() noexcept
: _v1(0)
, _area(0)
{
}

// ctor with value initialization
constexpr TimeAverage(time_point t, double v) noexcept
: _t0(t)
, _t1(t)
, _v1(v)
, _area(0.0)
{
}

// returns true if initialized via reset() or ctor
constexpr bool initialized() const noexcept
{
return _t0.time_since_epoch() != time_point::duration::zero();
}

// initialize instance with given values
void reset(time_point t, double v) noexcept
{
_t0 = t;
_t1 = t;
_v1 = v;
_area = 0.0;
}

// move startTime() and endTime() to given time-point maintaining duration()
void moveTo(time_point t) noexcept
{
auto dt = t - _t1;
_t0 += dt;
_t1 += dt;
}

// Adds an arbitrary value at given time-point, returns new average
// May reset() instance and returns `vn` if not yet initialized()
double add(time_point t, double v) noexcept
{
if (!initialized())
{
reset(t, v);
return v;
}
else
{
// time*value area accumulation in seconds
const std::chrono::duration<double> dt1 = std::chrono::duration<double>(t - _t1);
// area(rectangle + triangle)
// _area += dt1.count() * _v1 + (dt1.count() * (v - _v1) * 0.5);
_area += dt1.count() * 0.5 * (_v1 + v);
_t1 = t;
_v1 = v;
return average();
}
}

// Returns average
constexpr double average() const noexcept
{
const double dt = duration();
return dt > std::numeric_limits<double>::epsilon() ? _area / dt : _v1;
}

// Returns last value
constexpr double last() const noexcept { return _v1; }
// Returns last time-point
constexpr std::chrono::steady_clock::time_point lastTime() const noexcept { return _t1; }
// Returns first time-point
constexpr std::chrono::steady_clock::time_point startTime() const noexcept { return _t0; }
// Returns duration in seconds
constexpr double duration() const noexcept
{
return std::chrono::duration<double>(_t1 - _t0).count();
}
};

class DirectoryCounter
{
void *_tasks;
Expand Down
8 changes: 4 additions & 4 deletions kit/Kit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
#include <common/JsonUtil.hpp>
#include "KitHelper.hpp"
#include "Kit.hpp"
#include <NetUtil.hpp>
#include <Protocol.hpp>
#include <Log.hpp>
#include <Png.hpp>
Expand Down Expand Up @@ -1448,7 +1449,7 @@ bool Document::forkToSave(const std::function<void()> &childSave, int viewId)
// FIXME: defer and queue a 2nd save if queued during save ...

std::shared_ptr<StreamSocket> parentSocket, childSocket;
if (!StreamSocket::socketpair(parentSocket, childSocket))
if (!StreamSocket::socketpair(start, parentSocket, childSocket))
return false;

// To encode into the child process id for debugging
Expand Down Expand Up @@ -2838,7 +2839,7 @@ void documentViewCallback(const int type, const char* payload, void* data)
int pollCallback(void* pData, int timeoutUs)
{
if (timeoutUs < 0)
timeoutUs = SocketPoll::DefaultPollTimeoutMicroS.count();
timeoutUs = net::Config::get().SocketPollTimeout.count();
#ifndef IOS
if (!pData)
return 0;
Expand Down Expand Up @@ -3500,8 +3501,7 @@ void lokit_main(
}
}

if (!mainKit->insertNewUnixSocket(MasterLocation, pathAndQuery, websocketHandler,
&shareFDs))
if (!mainKit->insertNewUnixSocket(MasterLocation, pathAndQuery, websocketHandler, &shareFDs))
{
LOG_SFL("Failed to connect to WSD. Will exit.");
Util::forcedExit(EX_SOFTWARE);
Expand Down
76 changes: 42 additions & 34 deletions net/DelaySocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@
/// Reads from fd, delays that and then writes to _dest.
class DelaySocket : public Socket {
int _delayMs;
enum State { ReadWrite, // normal socket
EofFlushWrites, // finish up writes and close
Closed };
enum class State : uint8_t { ReadWrite, // normal socket
EofFlushWrites, // finish up writes and close
Closed };

static const char* toString(State s) noexcept {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't using STATE_ENUM be better?

Let's reuse and improve what we have.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same thought, but then didn't want to change even more code. So yes, we could reuse our macro .. even if it would render the enum underling bigger (I used uint8_t .. the macro uses int in general, no biggy).

if( s == State::ReadWrite ) return "ReadWrite";
else if( s == State::EofFlushWrites ) return "EofFlushWrites";
else return "Closed";
}

State _state;
std::shared_ptr<DelaySocket> _dest; // our writing twin.

Expand All @@ -48,9 +55,9 @@

std::vector<std::shared_ptr<WriteChunk>> _chunks;
public:
DelaySocket(int delayMs, int fd) :
Socket (fd, Socket::Type::Unix), _delayMs(delayMs),
_state(ReadWrite)
DelaySocket(int delayMs, int fd, std::chrono::steady_clock::time_point creationTime)
: Socket (fd, Socket::Type::Unix, creationTime),
_delayMs(delayMs), _state(State::ReadWrite)
{
// setSocketBufferSize(Socket::DefaultSendBufferSize);
}
Expand All @@ -61,7 +68,7 @@

void dumpState(std::ostream& os) override
{
os << "\tfd: " << getFD()
os << "\tfd: " << fd()
<< "\n\tqueue: " << _chunks.size() << '\n';
auto now = std::chrono::steady_clock::now();
for (auto &chunk : _chunks)
Expand All @@ -83,7 +90,7 @@
int64_t remainingMicroS = std::chrono::duration_cast<std::chrono::microseconds>(
(*_chunks.begin())->getSendTime() - now).count();
if (remainingMicroS < timeoutMaxMicroS)
DELAY_LOG('#' << getFD() << " reset timeout max to " << remainingMicroS
DELAY_LOG('#' << fd() << " reset timeout max to " << remainingMicroS
<< "us from " << timeoutMaxMicroS << "us\n");
timeoutMaxMicroS = std::min(timeoutMaxMicroS, remainingMicroS);
}
Expand All @@ -104,45 +111,45 @@
{
switch (newState)
{
case ReadWrite:
case State::ReadWrite:
assert (false);
break;
case EofFlushWrites:
assert (_state == ReadWrite);
case State::EofFlushWrites:
assert (_state == State::ReadWrite);
assert (_dest);
_dest->pushCloseChunk();
_dest = nullptr;
break;
case Closed:
if (_dest && _state == ReadWrite)
case State::Closed:
if (_dest && _state == State::ReadWrite)
_dest->pushCloseChunk();
_dest = nullptr;
shutdown();
break;
}
DELAY_LOG('#' << getFD() << " changed to state " << newState << '\n');
DELAY_LOG('#' << fd() << " changed to state " << toString(newState) << '\n');
_state = newState;
}

void handlePoll(SocketDisposition &disposition,
std::chrono::steady_clock::time_point now, int events) override
{
if (_state == ReadWrite && (events & POLLIN))
if (_state == State::ReadWrite && (events & POLLIN))
{
auto chunk = std::make_shared<WriteChunk>(_delayMs);

char buf[64 * 1024];
ssize_t len;
size_t toRead = sizeof(buf); //std::min(sizeof(buf), WindowSize - _chunksSize);
do {
len = ::read(getFD(), buf, toRead);
len = ::read(fd(), buf, toRead);
} while (len < 0 && errno == EINTR);

if (len == 0) // EOF.
changeState(EofFlushWrites);
changeState(State::EofFlushWrites);
else if (len >= 0)
{
DELAY_LOG('#' << getFD() << " read " << len
DELAY_LOG('#' << fd() << " read " << len
<< " to queue: " << _chunks.size() << '\n');
chunk->getData().insert(chunk->getData().end(), &buf[0], &buf[len]);
if (_dest)
Expand All @@ -152,15 +159,15 @@
}
else if (errno != EAGAIN && errno != EWOULDBLOCK)
{
DELAY_LOG('#' << getFD() << " error : " << Util::symbolicErrno(errno) << ": " << strerror(errno) << '\n');
changeState(Closed); // FIXME - propagate the error ?
DELAY_LOG('#' << fd() << " error : " << Util::symbolicErrno(errno) << ": " << strerror(errno) << '\n');
changeState(State::Closed); // FIXME - propagate the error ?
Dismissed Show dismissed Hide dismissed
}
}

if (_chunks.empty())
{
if (_state == EofFlushWrites)
changeState(Closed);
if (_state == State::EofFlushWrites)
changeState(State::Closed);
}
else // Write if we have delayed enough.
{
Expand All @@ -170,35 +177,35 @@
{
if (chunk->getData().empty())
{ // delayed error or close
DELAY_LOG('#' << getFD() << " handling delayed close\n");
changeState(Closed);
DELAY_LOG('#' << fd() << " handling delayed close\n");
changeState(State::Closed);
}
else
{
ssize_t len;
do {
len = ::write(getFD(), &chunk->getData()[0], chunk->getData().size());
len = ::write(fd(), &chunk->getData()[0], chunk->getData().size());
} while (len < 0 && errno == EINTR);

if (len < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
DELAY_LOG('#' << getFD() << " full - waiting for write\n");
DELAY_LOG('#' << fd() << " full - waiting for write\n");
}
else
{
DELAY_LOG('#' << getFD() << " failed onwards write "
DELAY_LOG('#' << fd() << " failed onwards write "
<< len << "bytes of "
<< chunk->getData().size()
<< " queue: " << _chunks.size() << " error: "
<< Util::symbolicErrno(errno) << ": " << strerror(errno) << '\n');
changeState(Closed);
changeState(State::Closed);
}
}
else
{
DELAY_LOG('#' << getFD() << " written onwards " << len << "bytes of "
DELAY_LOG('#' << fd() << " written onwards " << len << "bytes of "
<< chunk->getData().size()
<< " queue: " << _chunks.size() << '\n');
if (len > 0)
Expand All @@ -213,11 +220,11 @@

if (events & (POLLERR | POLLHUP | POLLNVAL))
{
DELAY_LOG('#' << getFD() << " error events: " << events << '\n');
changeState(Closed);
DELAY_LOG('#' << fd() << " error events: " << events << '\n');
changeState(State::Closed);
}

if (_state == Closed)
if (_state == State::Closed)
disposition.setClosed();
}
};
Expand Down Expand Up @@ -257,8 +264,9 @@
int internalFd = pair[0];
int delayFd = pair[1];

auto physical = std::make_shared<DelaySocket>(delayMs, physicalFd);
auto internal = std::make_shared<DelaySocket>(delayMs, internalFd);
const std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
auto physical = std::make_shared<DelaySocket>(delayMs, physicalFd, now);
auto internal = std::make_shared<DelaySocket>(delayMs, internalFd, now);
physical->setDestination(internal);
internal->setDestination(physical);

Expand Down
6 changes: 3 additions & 3 deletions net/HttpHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ static void sendFileImpl(const std::shared_ptr<StreamSocket>& socket, const std:
FileUtil::Stat st(path);
if (st.bad())
{
LOG_WRN('#' << socket->getFD() << ": Failed to stat [" << path
LOG_WRN('#' << socket->fd() << ": Failed to stat [" << path
<< "]. File will not be sent.");
throw Poco::FileNotFoundException("Failed to stat [" + path + "]. File will not be sent.");
}
Expand Down Expand Up @@ -134,7 +134,7 @@ static void sendFileImpl(const std::shared_ptr<StreamSocket>& socket, const std:
if (!deflate || true)
{
response.setContentLength(st.size());
LOG_TRC('#' << socket->getFD() << ": Sending " << (headerOnly ? "header for " : "")
LOG_TRC('#' << socket->fd() << ": Sending " << (headerOnly ? "header for " : "")
<< " file [" << path << "].");
socket->send(response);

Expand All @@ -144,7 +144,7 @@ static void sendFileImpl(const std::shared_ptr<StreamSocket>& socket, const std:
else
{
response.set("Content-Encoding", "deflate");
LOG_TRC('#' << socket->getFD() << ": Sending " << (headerOnly ? "header for " : "")
LOG_TRC('#' << socket->fd() << ": Sending " << (headerOnly ? "header for " : "")
<< " file [" << path << "].");
socket->send(response);

Expand Down
Loading
Loading