Skip to content

Commit

Permalink
Fix bug #5662
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanFreeman committed Jan 24, 2025
1 parent 214334f commit cf82cce
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 53 deletions.
28 changes: 14 additions & 14 deletions src/server/reactor_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -317,32 +317,26 @@ void ReactorThread::shutdown(Reactor *reactor) {
}
}

#ifdef SW_THREAD
if (serv->is_thread_mode()) {
Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_worker(reactor->id));
reactor->del(socket);
serv->stop_async_worker(serv->get_worker(reactor->id));
return;
}
#endif

SW_LOOP_N(serv->worker_num) {
if (i % serv->reactor_num != reactor->id) {
continue;
if (i % serv->reactor_num == reactor->id) {
Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_master(i));
reactor->del(socket);
}
Socket *socket = message_bus.get_pipe_socket(serv->get_worker_pipe_master(i));
reactor->del(socket);
}

serv->foreach_connection([serv, reactor](Connection *conn) {
if (conn->fd % serv->reactor_num != reactor->id) {
return;
}
if (!conn->peer_closed && !conn->socket->removed) {
if (conn->fd % serv->reactor_num == reactor->id && !conn->peer_closed && !conn->socket->removed) {
reactor->remove_read_event(conn->socket);
}
});

if (serv->is_thread_mode()) {
serv->stop_async_worker(serv->get_worker(reactor->id));
}

reactor->set_wait_exit(true);
}

Expand Down Expand Up @@ -755,12 +749,14 @@ int ReactorThread::init(Server *serv, Reactor *reactor, uint16_t reactor_id) {
serv->init_reactor(reactor);
serv->init_pipe_sockets(&message_bus);

#ifdef SW_THREAD
if (serv->is_thread_mode()) {
Worker *worker = serv->get_worker(reactor_id);
serv->init_event_worker(worker);
auto pipe_worker = message_bus.get_pipe_socket(worker->pipe_worker);
reactor->add(pipe_worker, SW_EVENT_READ);
}
#endif

if (serv->pipe_command) {
auto pipe_socket = serv->pipe_command->get_socket(false);
Expand Down Expand Up @@ -812,9 +808,11 @@ void Server::reactor_thread_main_loop(Server *serv, int reactor_id) {
return;
}

#ifdef SW_THREAD
if (serv->is_thread_mode()) {
serv->call_worker_start_callback(serv->get_worker(reactor_id));
}
#endif

Reactor *reactor = sw_reactor();
if (thread->init(serv, reactor, reactor_id) < 0) {
Expand All @@ -827,9 +825,11 @@ void Server::reactor_thread_main_loop(Server *serv, int reactor_id) {
}
// main loop
swoole_event_wait();
#ifdef SW_THREAD
if (serv->is_thread_mode()) {
serv->call_worker_stop_callback(serv->get_worker(reactor_id));
}
#endif
thread->clean();
}

Expand Down
97 changes: 58 additions & 39 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ static int Worker_onPipeReceive(Reactor *reactor, Event *event);
static void Worker_reactor_try_to_exit(Reactor *reactor);

void Server::worker_signal_init(void) {
#ifdef SW_THREAD
if (is_thread_mode()) {
return;
}
#endif

swoole_signal_set(SIGHUP, nullptr);
swoole_signal_set(SIGPIPE, SIG_IGN);
swoole_signal_set(SIGUSR1, nullptr);
Expand Down Expand Up @@ -350,11 +353,13 @@ void Server::call_worker_error_callback(Worker *worker, const ExitStatus &status
}

bool Server::kill_worker(WorkerId worker_id, bool wait_reactor) {
#ifdef SW_THREAD
if (is_thread_mode()) {
DataHead event = {};
event.type = SW_SERVER_EVENT_SHUTDOWN;
return send_to_worker_from_worker(get_worker(worker_id), &event, sizeof(event), SW_PIPE_MASTER) != -1;
}
#endif

if (worker_id == sw_worker()->id && !wait_reactor) {
if (swoole_event_is_available()) {
Expand All @@ -376,16 +381,13 @@ bool Server::kill_worker(WorkerId worker_id, bool wait_reactor) {
}

void Server::stop_async_worker(Worker *worker) {
Reactor *reactor = SwooleTG.reactor;

worker->shutdown();
if (worker->type == SW_PROCESS_EVENTWORKER) {
reset_worker_counter(worker);
}

/**
* force to end.
*/
// forced termination
Reactor *reactor = SwooleTG.reactor;
if (reload_async == 0) {
reactor->running = false;
return;
Expand All @@ -400,34 +402,37 @@ void Server::stop_async_worker(Worker *worker) {
SwooleWG.worker_copy = new Worker{};
*SwooleWG.worker_copy = *worker;
SwooleWG.worker = worker;

if (worker->pipe_worker && !worker->pipe_worker->removed) {
reactor->remove_read_event(worker->pipe_worker);
auto pipe_worker = worker->pipe_worker;
#ifdef SW_THREAD
if (is_thread_mode()) {
pipe_worker = get_worker_message_bus()->get_pipe_socket(worker->pipe_worker);
}
#endif
if (pipe_worker && !pipe_worker->removed) {
reactor->remove_read_event(pipe_worker);
}

if (is_base_mode()) {
if (is_worker()) {
if (worker->id == 0 && gs->event_workers.running == 0) {
if (swoole_isset_hook(SW_GLOBAL_HOOK_BEFORE_SERVER_SHUTDOWN)) {
swoole_call_hook(SW_GLOBAL_HOOK_BEFORE_SERVER_SHUTDOWN, this);
}
if (onBeforeShutdown) {
onBeforeShutdown(this);
}
}
for (auto ls : ports) {
reactor->del(ls->socket);
if (is_base_mode() && is_worker()) {
if (worker->id == 0 && gs->event_workers.running == 0) {
if (swoole_isset_hook(SW_GLOBAL_HOOK_BEFORE_SERVER_SHUTDOWN)) {
swoole_call_hook(SW_GLOBAL_HOOK_BEFORE_SERVER_SHUTDOWN, this);

Check warning on line 418 in src/server/worker.cc

View check run for this annotation

Codecov / codecov/patch

src/server/worker.cc#L418

Added line #L418 was not covered by tests
}
if (worker->pipe_master && !worker->pipe_master->removed) {
reactor->remove_read_event(worker->pipe_master);
if (onBeforeShutdown) {
onBeforeShutdown(this);

Check warning on line 421 in src/server/worker.cc

View check run for this annotation

Codecov / codecov/patch

src/server/worker.cc#L421

Added line #L421 was not covered by tests
}
foreach_connection([reactor](Connection *conn) {
if (!conn->peer_closed && !conn->socket->removed) {
reactor->remove_read_event(conn->socket);
}
});
clear_timer();
}
if (worker->pipe_master && !worker->pipe_master->removed) {
reactor->remove_read_event(worker->pipe_master);
}
for (auto ls : ports) {
reactor->del(ls->socket);
}
foreach_connection([reactor](Connection *conn) {
if (!conn->peer_closed && !conn->socket->removed) {
reactor->remove_read_event(conn->socket);
}
});
clear_timer();
} else if (is_process_mode()) {
WorkerStopMessage msg;
msg.pid = SwooleG.pid;
Expand All @@ -436,13 +441,24 @@ void Server::stop_async_worker(Worker *worker) {
if (gs->event_workers.push_message(SW_WORKER_MESSAGE_STOP, &msg, sizeof(msg)) < 0) {
swoole_sys_warning("failed to push WORKER_STOP message");
}
} else if (is_thread_mode()) {
}
#ifdef SW_THREAD
else if (is_thread_mode()) {
SW_LOOP_N(worker_num) {
if (i % reactor_num == reactor->id) {
auto pipe_master = get_worker_message_bus()->get_pipe_socket(get_worker_pipe_master(i));
reactor->remove_read_event(pipe_master);
}
}

foreach_connection([this, reactor](Connection *conn) {
if (conn->reactor_id == reactor->id && !conn->peer_closed && !conn->socket->removed) {
reactor->remove_read_event(conn->socket);
}
});
} else {
}
#endif
else {
assert(0);
}

Expand All @@ -455,21 +471,21 @@ void Server::stop_async_worker(Worker *worker) {

static void Worker_reactor_try_to_exit(Reactor *reactor) {
Server *serv;
if (swoole_get_process_type() == SW_PROCESS_TASKWORKER) {
if (sw_likely(swoole_get_process_type() != SW_PROCESS_TASKWORKER)) {
serv = (Server *) reactor->ptr;
} else {
ProcessPool *pool = (ProcessPool *) reactor->ptr;
serv = (Server *) pool->ptr;
} else {
serv = (Server *) reactor->ptr;
}
uint8_t call_worker_exit_func = 0;

bool has_call_worker_exit_func = false;
while (1) {
if (reactor->if_exit()) {
reactor->running = false;
} else {
if (serv->onWorkerExit && call_worker_exit_func == 0) {
if (serv->onWorkerExit && !has_call_worker_exit_func) {
has_call_worker_exit_func = true;

Check warning on line 487 in src/server/worker.cc

View check run for this annotation

Codecov / codecov/patch

src/server/worker.cc#L487

Added line #L487 was not covered by tests
serv->onWorkerExit(serv, sw_worker());
call_worker_exit_func = 1;
continue;
}
int remaining_time = serv->max_wait_time - (::time(nullptr) - SwooleWG.exit_time);
Expand Down Expand Up @@ -504,18 +520,21 @@ void Server::drain_worker_pipe() {

void Server::clean_worker_connections(Worker *worker) {
sw_reactor()->destroyed = true;
if (sw_likely(is_base_mode())) {
foreach_connection([this](Connection *conn) { close(conn->session_id, true); });
return;

Check warning on line 525 in src/server/worker.cc

View check run for this annotation

Codecov / codecov/patch

src/server/worker.cc#L524-L525

Added lines #L524 - L525 were not covered by tests
}

#ifdef SW_THREAD
if (is_thread_mode()) {
foreach_connection([this, worker](Connection *conn) {
if (conn->reactor_id == worker->id) {
close(conn->session_id, true);
}
});
} else if (is_base_mode()) {
foreach_connection([this](Connection *conn) { close(conn->session_id, true); });
} else {
return;
}
#endif
}

/**
Expand Down
44 changes: 44 additions & 0 deletions tests/swoole_thread/server/bug_5662.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
--TEST--
swoole_thread/server: Github #5662
--SKIPIF--
<?php
require __DIR__ . '/../../include/skipif.inc';
skip_if_nts();
?>
--FILE--
<?php
require __DIR__ . '/../../include/bootstrap.php';

use Swoole\Thread;
use Swoole\Thread\Queue;

$port = get_constant_port(__FILE__);
$server = new Swoole\Http\Server('127.0.0.1', $port, SWOOLE_THREAD);
$server->set([
'log_file' => '/dev/null',
'worker_num' => 2,
'max_request' => 5,
'init_arguments' => function () {
global $queue;
$queue = new Queue();
return [$queue];
}
]);
$server->on('WorkerStart', function (Swoole\Server $server, $workerId) {
[$queue] = Thread::getArguments();
$queue->push('start', Queue::NOTIFY_ALL);
});
$server->addProcess(new Swoole\Process(function ($process) use ($server, $port) {
[$queue] = Thread::getArguments();
Assert::true($queue->pop(-1) == 'start');
for ($i = 0; $i < 20; $i++) {
Assert::true(file_get_contents("http://127.0.0.1:{$port}/") == 'OK');
}
$server->shutdown();
}));
$server->on('request', function (Swoole\Http\Request $request, Swoole\Http\Response $response) {
$response->end('OK');
});
$server->start();
?>
--EXPECT--

0 comments on commit cf82cce

Please sign in to comment.