From db51e622ab5cd3738d114a9ce05f4d86cccd2903 Mon Sep 17 00:00:00 2001 From: Winston-leon <1871056255@qq.com> Date: Thu, 7 Apr 2022 09:35:47 +0800 Subject: [PATCH 1/2] * Provide threadpool plugin for mysql-8.0.25 - Add admin connection related functions; - Improve threadpool plugin uninstall flow --- plugin/thread_pool/threadpool.h | 5 +- plugin/thread_pool/threadpool_common.cc | 120 +++++----- plugin/thread_pool/threadpool_unix.cc | 285 +++++++++++++++--------- plugin/thread_pool/threadpool_unix.h | 6 +- 4 files changed, 243 insertions(+), 173 deletions(-) diff --git a/plugin/thread_pool/threadpool.h b/plugin/thread_pool/threadpool.h index d948fd7369ea..0dc80e019c66 100644 --- a/plugin/thread_pool/threadpool.h +++ b/plugin/thread_pool/threadpool.h @@ -40,8 +40,6 @@ extern uint threadpool_max_threads; extern uint threadpool_stall_limit; /* time interval in 10 ms units for stall checks*/ extern uint threadpool_oversubscribe; /* Maximum active threads in group */ extern uint threadpool_toobusy; /* Maximum active and waiting threads in group */ -extern uint threadpool_high_prio_tickets; -extern ulong threadpool_high_prio_mode; /* Possible values for thread_pool_high_prio_mode */ extern const char *threadpool_high_prio_mode_names[]; @@ -81,5 +79,8 @@ extern TP_STATISTICS tp_stats; extern void tp_set_threadpool_size(uint val) noexcept; extern void tp_set_threadpool_stall_limit(uint val) noexcept; +extern uint tp_get_thdvar_high_prio_tickets(THD *thd); +extern uint tp_get_thdvar_high_prio_mode(THD *thd); + #endif // THREADPOOL_H_ diff --git a/plugin/thread_pool/threadpool_common.cc b/plugin/thread_pool/threadpool_common.cc index 3d07c0b3ce8e..c2cca606c3db 100644 --- a/plugin/thread_pool/threadpool_common.cc +++ b/plugin/thread_pool/threadpool_common.cc @@ -14,11 +14,11 @@ along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ - - #include "threadpool.h" #include "threadpool_unix.h" #include "my_thread_local.h" +#include "my_sys.h" +#include "mysql/plugin.h" #include "mysql/psi/mysql_idle.h" #include "mysql/thread_pool_priv.h" #include "sql/debug_sync.h" @@ -27,15 +27,12 @@ #include "sql/sql_connect.h" #include "sql/protocol_classic.h" #include "sql/sql_parse.h" -#include "mysql/plugin.h" #include "sql/sql_table.h" #include "sql/field.h" #include "sql/sql_show.h" #include "sql/sql_class.h" -#include "my_sys.h" -#include "mysql/plugin.h" #include - +#include #define MYSQL_SERVER 1 @@ -46,8 +43,6 @@ uint threadpool_stall_limit; uint threadpool_max_threads; uint threadpool_oversubscribe; uint threadpool_toobusy; -uint threadpool_high_prio_tickets; -ulong threadpool_high_prio_mode; /* Stats */ TP_STATISTICS tp_stats; @@ -79,7 +74,7 @@ class Worker_thread_context { #ifdef HAVE_PSI_THREAD_INTERFACE PSI_thread *const psi_thread; #endif -#ifndef DBUG_OFF +#ifndef NDEBUG const my_thread_id thread_id; #endif public: @@ -88,7 +83,7 @@ class Worker_thread_context { #ifdef HAVE_PSI_THREAD_INTERFACE psi_thread(PSI_THREAD_CALL(get_thread)()) #endif -#ifndef DBUG_OFF +#ifndef NDEBUG , thread_id(my_thread_var_id()) #endif @@ -99,7 +94,7 @@ class Worker_thread_context { #ifdef HAVE_PSI_THREAD_INTERFACE PSI_THREAD_CALL(set_thread)(psi_thread); #endif -#ifndef DBUG_OFF +#ifndef NDEBUG set_my_thread_var_id(thread_id); #endif THR_MALLOC = nullptr; @@ -110,7 +105,7 @@ class Worker_thread_context { Attach/associate the connection with the OS thread, */ static bool thread_attach(THD *thd) { -#ifndef DBUG_OFF +#ifndef NDEBUG set_my_thread_var_id(thd->thread_id()); #endif thd->thread_stack = (char *)&thd; @@ -171,7 +166,6 @@ int threadpool_add_connection(THD *thd) { if (thd_connection_alive(thd)) { retval = 0; thd_set_net_read_write(thd, 1); - //thd->skip_wait_timeout = true; // !! todo MYSQL_SOCKET_SET_STATE(thd->get_protocol_classic()->get_vio()->mysql_socket, PSI_SOCKET_STATE_IDLE); thd->m_server_idle = true; @@ -305,30 +299,30 @@ static inline int my_getncpus() noexcept { #endif } -static MYSQL_SYSVAR_UINT(threadpool_idle_timeout, threadpool_idle_timeout, +static MYSQL_SYSVAR_UINT(idle_timeout, threadpool_idle_timeout, PLUGIN_VAR_RQCMDARG, "Timeout in seconds for an idle thread in the thread pool." "Worker thread will be shut down after timeout", NULL, NULL, 60, 1, UINT_MAX, 1); -static MYSQL_SYSVAR_UINT(threadpool_oversubscribe, threadpool_oversubscribe, +static MYSQL_SYSVAR_UINT(oversubscribe, threadpool_oversubscribe, PLUGIN_VAR_RQCMDARG, "How many additional active worker threads in a group are allowed.", NULL, NULL, 3, 1, 1000, 1); -static MYSQL_SYSVAR_UINT(threadpool_toobusy, threadpool_toobusy, +static MYSQL_SYSVAR_UINT(toobusy, threadpool_toobusy, PLUGIN_VAR_RQCMDARG, "How many additional active and waiting worker threads in a group are allowed.", NULL, NULL, 13, 1, 1000, 1); -static MYSQL_SYSVAR_UINT(threadpool_size, threadpool_size, +static MYSQL_SYSVAR_UINT(size, threadpool_size, PLUGIN_VAR_RQCMDARG, "Number of thread groups in the pool. " "This parameter is roughly equivalent to maximum number of concurrently " "executing threads (threads in a waiting state do not count as executing).", NULL, fix_threadpool_size, (uint)my_getncpus(), 1, MAX_THREAD_GROUPS, 1); -static MYSQL_SYSVAR_UINT(threadpool_stall_limit, threadpool_stall_limit, +static MYSQL_SYSVAR_UINT(stall_limit, threadpool_stall_limit, PLUGIN_VAR_RQCMDARG, "Maximum query execution time in milliseconds," "before an executing non-yielding thread is considered stalled." @@ -336,34 +330,7 @@ static MYSQL_SYSVAR_UINT(threadpool_stall_limit, threadpool_stall_limit, "may be created to handle remaining clients.", NULL, fix_threadpool_stall_limit, 500, 10, UINT_MAX, 1); -static MYSQL_SYSVAR_UINT(threadpool_high_prio_tickets, threadpool_high_prio_tickets, - PLUGIN_VAR_RQCMDARG, - "Number of tickets to enter the high priority event queue for each " - "transaction.", - NULL, NULL , UINT_MAX, 0, UINT_MAX, 1); - -const char *threadpool_high_prio_mode_names[] = {"transactions", "statements", - "none", NullS}; -TYPELIB threadpool_high_prio_mode_typelib = -{ - array_elements(threadpool_high_prio_mode_names) - 1, "threadpool_high_prio_mode", - threadpool_high_prio_mode_names, NULL -}; - -static MYSQL_SYSVAR_ENUM(threadpool_high_prio_mode, threadpool_high_prio_mode, - PLUGIN_VAR_RQCMDARG, - "High priority queue mode: one of 'transactions', 'statements' or 'none'. " - "In the 'transactions' mode the thread pool uses both high- and low-priority " - "queues depending on whether an event is generated by an already started " - "transaction and whether it has any high priority tickets (see " - "thread_pool_high_prio_tickets). In the 'statements' mode all events (i.e. " - "individual statements) always go to the high priority queue, regardless of " - "the current transaction state and high priority tickets. " - "'none' is the opposite of 'statements', i.e. disables the high priority queue " - "completely.", - NULL, NULL, TP_HIGH_PRIO_MODE_TRANSACTIONS, &threadpool_high_prio_mode_typelib); - -static MYSQL_SYSVAR_UINT(threadpool_max_threads, threadpool_max_threads, +static MYSQL_SYSVAR_UINT(max_threads, threadpool_max_threads, PLUGIN_VAR_RQCMDARG, "Maximum allowed number of worker threads in the thread pool", NULL, NULL, MAX_CONNECTIONS, 1, MAX_CONNECTIONS, 1); @@ -377,24 +344,55 @@ static int threadpool_plugin_init(void *) DBUG_RETURN(0); } - static int threadpool_plugin_deinit(void *) { DBUG_ENTER("threadpool_plugin_deinit"); - my_connection_handler_reset(); DBUG_RETURN(0); } -static struct SYS_VAR* system_variables[] = { - MYSQL_SYSVAR(threadpool_idle_timeout), - MYSQL_SYSVAR(threadpool_size), - MYSQL_SYSVAR(threadpool_max_threads), - MYSQL_SYSVAR(threadpool_stall_limit), - MYSQL_SYSVAR(threadpool_oversubscribe), - MYSQL_SYSVAR(threadpool_toobusy), - MYSQL_SYSVAR(threadpool_high_prio_tickets), - MYSQL_SYSVAR(threadpool_high_prio_mode), +static MYSQL_THDVAR_UINT(high_prio_tickets, + PLUGIN_VAR_NOCMDARG, + "Number of tickets to enter the high priority event queue for each " + "transaction.", + NULL, NULL, UINT_MAX, 0, UINT_MAX, 1); + +const char *threadpool_high_prio_mode_names[] = {"transactions", "statements", + "none", NullS}; +TYPELIB threadpool_high_prio_mode_typelib = { + array_elements(threadpool_high_prio_mode_names) - 1, "", + threadpool_high_prio_mode_names, NULL +}; + +static MYSQL_THDVAR_ENUM(high_prio_mode, + PLUGIN_VAR_NOCMDARG, + "High priority queue mode: one of 'transactions', 'statements' or 'none'. " + "In the 'transactions' mode the thread pool uses both high- and low-priority " + "queues depending on whether an event is generated by an already started " + "transaction and whether it has any high priority tickets (see " + "thread_pool_high_prio_tickets). In the 'statements' mode all events (i.e. " + "individual statements) always go to the high priority queue, regardless of " + "the current transaction state and high priority tickets. " + "'none' is the opposite of 'statements', i.e. disables the high priority queue " + "completely.", + NULL, NULL, TP_HIGH_PRIO_MODE_TRANSACTIONS, &threadpool_high_prio_mode_typelib); + +static uint &idle_timeout = threadpool_idle_timeout; +static uint &size = threadpool_size; +static uint &stall_limit = threadpool_stall_limit; +static uint &max_threads = threadpool_max_threads; +static uint &oversubscribe = threadpool_oversubscribe; +static uint &toobusy = threadpool_toobusy; + +SYS_VAR *system_variables[] = { + MYSQL_SYSVAR(idle_timeout), + MYSQL_SYSVAR(size), + MYSQL_SYSVAR(max_threads), + MYSQL_SYSVAR(stall_limit), + MYSQL_SYSVAR(oversubscribe), + MYSQL_SYSVAR(toobusy), + MYSQL_SYSVAR(high_prio_tickets), + MYSQL_SYSVAR(high_prio_mode), NULL }; @@ -741,3 +739,11 @@ mysql_declare_plugin(thread_pool) } mysql_declare_plugin_end; +uint tp_get_thdvar_high_prio_tickets(THD *thd) { + return THDVAR(thd, high_prio_tickets); +} + +uint tp_get_thdvar_high_prio_mode(THD *thd) { + return THDVAR(thd, high_prio_mode); +} + diff --git a/plugin/thread_pool/threadpool_unix.cc b/plugin/thread_pool/threadpool_unix.cc index c81bf7dd1e19..8501da336869 100644 --- a/plugin/thread_pool/threadpool_unix.cc +++ b/plugin/thread_pool/threadpool_unix.cc @@ -13,17 +13,17 @@ You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#include - -#include -#include "my_sys.h" -#include "my_systime.h" -#include "mysql/thread_pool_priv.h" // thd_is_transaction_active() +#include "threadpool_unix.h" #include "sql/debug_sync.h" #include "sql/log.h" #include "sql/protocol_classic.h" +#include "my_sys.h" +#include "my_systime.h" +#include "mysql/thread_pool_priv.h" // thd_is_transaction_active() +#include "mysql/plugin.h" #include "threadpool.h" -#include "threadpool_unix.h" +#include +#include #define MYSQL_SERVER 1 @@ -85,15 +85,16 @@ struct pool_timer_t { static pool_timer_t pool_timer; +static std::atomic threadpool_thds(0); + static void queue_put(thread_group_t *thread_group, connection_t *connection); static int wake_thread(thread_group_t *thread_group, bool due_to_stall) noexcept; static void handle_event(connection_t *connection); static int wake_or_create_thread(thread_group_t *thread_group, - bool due_to_stall = false, - bool admin_connection = false); -static int create_worker(thread_group_t *thread_group, bool due_to_stall, - bool admin_connection = false) noexcept; + bool due_to_stall = false); +static int create_worker(thread_group_t *thread_group, bool due_to_stall) noexcept; +static void *admin_port_worker_main(void *param); static void *worker_main(void *param); static void check_stall(thread_group_t *thread_group); static void connection_abort(connection_t *connection); @@ -262,7 +263,6 @@ namespace { Prevent too many active threads executing at the same time, if the workload is not CPU bound. */ - inline bool too_many_active_threads( const thread_group_t &thread_group) noexcept { return (thread_group.active_thread_count >= @@ -271,14 +271,13 @@ inline bool too_many_active_threads( } /* - Limit the number of 'busy' threads by 1 + thread_pool_oversubscribe. A thread + Limit the number of 'busy' threads by 1 + threadpool_toobusy. A thread is busy if it is in either the active state or the waiting state (i.e. between thd_wait_begin() / thd_wait_end() calls). */ - inline bool too_many_busy_threads(const thread_group_t &thread_group) noexcept { return (thread_group.active_thread_count + thread_group.waiting_thread_count > - 1 + (int)threadpool_oversubscribe); + 1 + (int)threadpool_toobusy); } /* @@ -286,10 +285,8 @@ inline bool too_many_busy_threads(const thread_group_t &thread_group) noexcept { based on its current thread_pool_high_prio_mode value, available high priority tickets and transactional state and whether any locks are held. */ - inline bool connection_is_high_prio(const connection_t &c) noexcept { - // const ulong mode = c.thd->variables.threadpool_high_prio_mode; - const ulong mode = c.threadpool_high_prio_mode; // !! todo + const ulong mode = tp_get_thdvar_high_prio_mode(c.thd); return (mode == TP_HIGH_PRIO_MODE_STATEMENTS) || (mode == TP_HIGH_PRIO_MODE_TRANSACTIONS && c.tickets > 0 && @@ -304,9 +301,7 @@ inline bool connection_is_high_prio(const connection_t &c) noexcept { } // namespace - /* Dequeue element from a workqueue */ - static connection_t *queue_get(thread_group_t *thread_group) noexcept { DBUG_ENTER("queue_get"); thread_group->queue_event_count++; @@ -379,7 +374,6 @@ class Thd_timeout_checker : public Do_THD_Impl { Find connections that have been idle for too long and kill them. Also, recalculate time when next timeout check should run. */ - static void timeout_check(pool_timer_t *timer) { DBUG_ENTER("timeout_check"); @@ -408,7 +402,6 @@ static void timeout_check(pool_timer_t *timer) { TODO: Let the timer sleep for long time if there is no work to be done. Currently it wakes up rather often on and idle server. */ - static void *timer_thread(void *param) noexcept { my_thread_init(); DBUG_ENTER("timer_thread"); @@ -447,7 +440,7 @@ static void *timer_thread(void *param) noexcept { mysql_mutex_destroy(&timer->mutex); my_thread_end(); - return NULL; + return nullptr; } /* @@ -524,10 +517,10 @@ static void check_stall(thread_group_t *thread_group) { static void start_timer(pool_timer_t *timer) noexcept { my_thread_handle thread_id; DBUG_ENTER("start_timer"); - mysql_mutex_init(key_timer_mutex, &timer->mutex, NULL); + mysql_mutex_init(key_timer_mutex, &timer->mutex, nullptr); mysql_cond_init(key_timer_cond, &timer->cond); timer->shutdown = false; - mysql_thread_create(key_timer_thread, &thread_id, NULL, timer_thread, timer); + mysql_thread_create(key_timer_thread, &thread_id, nullptr, timer_thread, timer); DBUG_VOID_RETURN; } @@ -630,8 +623,7 @@ static connection_t *listener(thread_group_t *thread_group) { c->tickets--; thread_group->high_prio_queue.push_back(c); } else { - // c->tickets = c->thd->variables.threadpool_high_prio_tickets; - c->tickets = c->threadpool_high_prio_tickets; // !! todo + c->tickets = tp_get_thdvar_high_prio_tickets(c->thd); queue_push(thread_group, c); } } @@ -680,7 +672,6 @@ static connection_t *listener(thread_group_t *thread_group) { @param count - 1, when new thread is created -1, when thread is about to exit */ - static void add_thread_count(thread_group_t *thread_group, int32 count) noexcept { thread_group->thread_count += count; @@ -697,17 +688,14 @@ static void add_thread_count(thread_group_t *thread_group, threadpool_max_threads, because we need at least 2 threads per group to prevent deadlocks (one listener + one worker) */ - static int create_worker(thread_group_t *thread_group, - bool due_to_stall, - bool admin_connection) noexcept { + bool due_to_stall) noexcept { my_thread_handle thread_id; bool max_threads_reached = false; int err; DBUG_ENTER("create_worker"); - if (!admin_connection && - tp_stats.num_worker_threads.load(std::memory_order_relaxed) >= + if (tp_stats.num_worker_threads.load(std::memory_order_relaxed) >= (int)threadpool_max_threads && thread_group->thread_count >= 2) { err = 1; @@ -723,8 +711,8 @@ static int create_worker(thread_group_t *thread_group, Global_THD_manager::get_instance()->inc_thread_created(); add_thread_count(thread_group, 1); TP_INCREMENT_GROUP_COUNTER(thread_group, thread_creations); - if (due_to_stall) - { + + if (due_to_stall) { TP_INCREMENT_GROUP_COUNTER(thread_group, thread_creations_due_to_stall); } } else { @@ -732,10 +720,11 @@ static int create_worker(thread_group_t *thread_group, } end: - if (err) + if (err) { print_pool_blocked_message(max_threads_reached); - else + } else { pool_block_start = 0; /* Reset pool blocked timer, if it was set */ + } DBUG_RETURN(err); } @@ -770,12 +759,9 @@ static ulonglong microsecond_throttling_interval( Worker creation is throttled, so we avoid too many threads to be created during the short time. - - If admin_connection is true, a new thread is created regardless of any other - limits. */ static int wake_or_create_thread(thread_group_t *thread_group, - bool due_to_stall, bool admin_connection) { + bool due_to_stall) { DBUG_ENTER("wake_or_create_thread"); if (thread_group->shutdown) DBUG_RETURN(0); @@ -785,14 +771,14 @@ static int wake_or_create_thread(thread_group_t *thread_group, if (thread_group->thread_count > thread_group->connection_count) DBUG_RETURN(-1); - if (thread_group->active_thread_count == 0 || admin_connection) { + if (thread_group->active_thread_count == 0) { /* We're better off creating a new thread here with no delay, either there are no workers at all, or they all are all blocking and there was no idle thread to wakeup. Smells like a potential deadlock or very slowly executing requests, e.g sleeps or user locks. */ - DBUG_RETURN(create_worker(thread_group, due_to_stall, admin_connection)); + DBUG_RETURN(create_worker(thread_group, due_to_stall)); } const ulonglong now = my_microsecond_getsystime(); @@ -813,10 +799,20 @@ static int thread_group_init(thread_group_t *thread_group, pthread_attr_t *thread_attr) noexcept { DBUG_ENTER("thread_group_init"); thread_group->pthread_attr = thread_attr; - mysql_mutex_init(key_group_mutex, &thread_group->mutex, NULL); + mysql_mutex_init(key_group_mutex, &thread_group->mutex, nullptr); thread_group->pollfd = -1; thread_group->shutdown_pipe[0] = -1; thread_group->shutdown_pipe[1] = -1; + thread_group->thread_count = 0; + thread_group->admin_port_thread_count = 0; + thread_group->dump_thread_count = 0; + thread_group->active_thread_count = 0; + thread_group->connection_count = 0; + thread_group->waiting_thread_count = 0; + thread_group->io_event_count = 0; + thread_group->queue_event_count = 0; + thread_group->shutdown = false; + thread_group->stalled = false; DBUG_RETURN(0); } @@ -837,7 +833,6 @@ static void thread_group_destroy(thread_group_t *thread_group) noexcept { /** Wake sleeping thread from waiting list */ - static int wake_thread(thread_group_t *thread_group, bool due_to_stall) noexcept { DBUG_ENTER("wake_thread"); worker_thread_t *thread = thread_group->waiting_threads.front(); @@ -857,7 +852,6 @@ static int wake_thread(thread_group_t *thread_group, bool due_to_stall) noexcept /** Shutdown for thread group */ - static void thread_group_close(thread_group_t *thread_group) noexcept { DBUG_ENTER("thread_group_close"); @@ -877,8 +871,8 @@ static void thread_group_close(thread_group_t *thread_group) noexcept { } /* Wake listener */ - if (io_poll_associate_fd(thread_group->pollfd, thread_group->shutdown_pipe[0], - nullptr)) { + if (io_poll_associate_fd(thread_group->pollfd, + thread_group->shutdown_pipe[0], nullptr)) { mysql_mutex_unlock(&thread_group->mutex); DBUG_VOID_RETURN; } @@ -912,21 +906,19 @@ static void thread_group_close(thread_group_t *thread_group) noexcept { Currently, this function is only used when new connections need to perform login (this is done in worker threads). - */ - static void queue_put(thread_group_t *thread_group, connection_t *connection) { DBUG_ENTER("queue_put"); mysql_mutex_lock(&thread_group->mutex); - // connection->tickets = connection->thd->variables.threadpool_high_prio_tickets; + connection->tickets = tp_get_thdvar_high_prio_tickets(connection->thd); connection->enqueue_time = pool_timer.current_microtime; - connection->tickets = connection->threadpool_high_prio_tickets; // !! todo queue_push(thread_group, connection); - if (thread_group->active_thread_count == 0) - wake_or_create_thread(thread_group, false, connection->thd->is_admin_connection()); + if (thread_group->active_thread_count == 0) { + wake_or_create_thread(thread_group, false); + } mysql_mutex_unlock(&thread_group->mutex); @@ -940,7 +932,7 @@ static void queue_put(thread_group_t *thread_group, connection_t *connection) { (if connection is not yet logged in), or there are unread bytes on the socket. If there are no pending events currently, thread will wait. - If timeout specified in abstime parameter passes, the function returns NULL. + If timeout specified in abstime parameter passes, the function returns nullptr. @param current_thread - current worker thread @param thread_group - current thread group @@ -948,9 +940,8 @@ static void queue_put(thread_group_t *thread_group, connection_t *connection) { @return connection with pending event. - NULL is returned if timeout has expired,or on shutdown. + nullptr is returned if timeout has expired,or on shutdown. */ - static connection_t *get_event(worker_thread_t *current_thread, thread_group_t *thread_group, struct timespec *abstime) { @@ -971,7 +962,7 @@ static connection_t *get_event(worker_thread_t *current_thread, if (connection) break; } - /* If there is currently no listener in the group, become one. */ + /* If there is currently no listener in the group, become one. */ if (!thread_group->listener) { thread_group->listener = current_thread; thread_group->active_thread_count--; @@ -1011,12 +1002,7 @@ static connection_t *get_event(worker_thread_t *current_thread, Not eligible for high priority processing. Restore tickets and put it into the low priority queue. */ - - // connection->tickets = - // connection->thd->variables.threadpool_high_prio_tickets; - connection->tickets = - connection->threadpool_high_prio_tickets; // !! todo - + connection->tickets = tp_get_thdvar_high_prio_tickets(connection->thd); thread_group->queue.push_back(connection); connection = nullptr; } @@ -1098,7 +1084,6 @@ static void wait_begin(thread_group_t *thread_group) noexcept { /** Tells the pool has finished waiting. */ - static void wait_end(thread_group_t *thread_group) noexcept { DBUG_ENTER("wait_end"); mysql_mutex_lock(&thread_group->mutex); @@ -1118,7 +1103,7 @@ static connection_t *alloc_connection(THD *thd) noexcept { connection_t *connection = (connection_t *)my_malloc( PSI_NOT_INSTRUMENTED /*key_memory_thread_pool_connection*/, - sizeof(connection_t), 0); // !! todo + sizeof(connection_t), 0); if (connection) { connection->thd = thd; connection->waiting = false; @@ -1148,8 +1133,9 @@ bool tp_add_connection( connection_t *const connection = alloc_connection(thd); if (unlikely(!connection)) { - // thd->get_protocol_classic()->end_net(); // !! todo + thd->get_protocol_classic()->end_net(); delete thd; + // channel will be closed by send_error_and_close_channel() channel_info->send_error_and_close_channel(ER_OUT_OF_RESOURCES, 0, false); DBUG_RETURN(true); } @@ -1159,6 +1145,8 @@ bool tp_add_connection( thd->set_new_thread_id(); thd->start_utime = my_micro_time(); + threadpool_thds++; + Global_THD_manager::get_instance()->add_thd(thd); thd->scheduler.data = connection; @@ -1168,33 +1156,53 @@ bool tp_add_connection( connection->thread_group = group; - mysql_mutex_lock(&group->mutex); - group->connection_count++; - mysql_mutex_unlock(&group->mutex); + if (thd->is_admin_connection()) { + my_thread_handle thread_id; + mysql_mutex_lock(&group->mutex); + int err = mysql_thread_create(key_worker_thread, &thread_id, + group->pthread_attr, admin_port_worker_main, connection); + + if (err) { + set_my_errno(errno); + print_pool_blocked_message(false); + } else { + group->admin_port_thread_count++; + } + mysql_mutex_unlock(&group->mutex); + } else { + mysql_mutex_lock(&group->mutex); + group->connection_count++; + mysql_mutex_unlock(&group->mutex); + + /* + Add connection to the work queue. Actual login + will be done by a worker thread. + */ + queue_put(group, connection); + } - /* - Add connection to the work queue. Actual login - will be done by a worker thread. - */ - queue_put(group, connection); DBUG_RETURN(false); } /** Terminate connection. */ - static void connection_abort(connection_t *connection) { DBUG_ENTER("connection_abort"); thread_group_t *group = connection->thread_group; - + bool is_admin_port = connection->thd->is_admin_connection(); + void *tmp_thd = static_cast(connection->thd); + tmp_thd = tmp_thd; threadpool_remove_connection(connection->thd); - mysql_mutex_lock(&group->mutex); - group->connection_count--; - mysql_mutex_unlock(&group->mutex); + if (!is_admin_port) { + mysql_mutex_lock(&group->mutex); + group->connection_count--; + mysql_mutex_unlock(&group->mutex); + } my_free(connection); + threadpool_thds--; DBUG_VOID_RETURN; } @@ -1204,7 +1212,9 @@ static void connection_abort(connection_t *connection) { void tp_post_kill_notification(THD *thd) noexcept { DBUG_ENTER("tp_post_kill_notification"); - if (current_thd == thd || thd->system_thread) DBUG_VOID_RETURN; + if (current_thd == thd || thd->system_thread) { + DBUG_VOID_RETURN; + } Vio *vio = thd->get_protocol_classic()->get_vio(); if (vio) vio_cancel(vio, SHUT_RD); @@ -1216,7 +1226,6 @@ alignas(CPU_LEVEL1_DCACHE_LINESIZE) std::atomic tp_waits[THD_WAIT_LAST /** MySQL scheduler callback: wait begin */ - void tp_wait_begin(THD *thd, int type MY_ATTRIBUTE((unused))) { DBUG_ENTER("tp_wait_begin"); @@ -1225,7 +1234,9 @@ void tp_wait_begin(THD *thd, int type MY_ATTRIBUTE((unused))) { } connection_t *connection = (connection_t *)thd->scheduler.data; - if (connection) { + + if (connection && connection->thd && + !connection->thd->is_admin_connection()) { assert(!connection->waiting); connection->waiting = true; assert(type > 0 && type < THD_WAIT_LAST); @@ -1246,7 +1257,9 @@ void tp_wait_end(THD *thd) { DBUG_VOID_RETURN; } connection_t *connection = (connection_t *)thd->scheduler.data; - if (connection) { + + if (connection && connection->thd && + !connection->thd->is_admin_connection()) { assert(connection->waiting); connection->waiting = false; wait_end(connection->thread_group); @@ -1359,36 +1372,74 @@ static int start_io(connection_t *connection) { static void handle_event(connection_t *connection) { DBUG_ENTER("handle_event"); - int err; + int err = 0; - if (!connection->logged_in) { - err = threadpool_add_connection(connection->thd); - connection->logged_in = true; - } else { - err = threadpool_process_request(connection->thd); - } + while (1) { + if (!connection->logged_in) { + err = threadpool_add_connection(connection->thd); + connection->logged_in = true; + } else { + err = threadpool_process_request(connection->thd); + } - if (err) goto end; + if (err) { + goto end; + } - set_wait_timeout(connection); - err = start_io(connection); + set_wait_timeout(connection); + + if (connection->thd->is_admin_connection()) { + } else { + break; + } + } + + if (!connection->thd->is_admin_connection()) { + err = start_io(connection); + } end: - if (err) connection_abort(connection); + if (err || connection->thd->is_admin_connection()) { + connection_abort(connection); + } DBUG_VOID_RETURN; } +static void *admin_port_worker_main(void *param) { + my_thread_init(); + DBUG_ENTER("admin_port_worker_main"); + +#ifdef HAVE_PSI_THREAD_INTERFACE + PSI_THREAD_CALL(set_thread_account) + (nullptr, 0, nullptr, 0); +#endif + + connection_t *connection = static_cast(param); + assert(connection != nullptr); + assert(connection->thread_group != nullptr); + thread_group_t *group = connection->thread_group; + + handle_event(connection); + + mysql_mutex_lock(&group->mutex); + group->admin_port_thread_count--; + mysql_mutex_unlock(&group->mutex); + + my_thread_end(); + return nullptr; +} + /** Worker thread's main */ - static void *worker_main(void *param) { my_thread_init(); DBUG_ENTER("worker_main"); - thread_group_t *thread_group = (thread_group_t *)param; + thread_group_t *thread_group = static_cast(param); + assert(thread_group != nullptr); /* Init per-thread structure */ worker_thread_t this_thread; @@ -1398,16 +1449,19 @@ static void *worker_main(void *param) { #ifdef HAVE_PSI_THREAD_INTERFACE PSI_THREAD_CALL(set_thread_account) - (NULL, 0, NULL, 0); + (nullptr, 0, nullptr, 0); #endif /* Run event loop */ for (;;) { - connection_t *connection; struct timespec ts; set_timespec(&ts, threadpool_idle_timeout); - connection = get_event(&this_thread, thread_group, &ts); - if (!connection) break; + connection_t *connection = get_event(&this_thread, thread_group, &ts); + + if (!connection) { + break; + } + this_thread.event_count++; handle_event(connection); } @@ -1447,21 +1501,32 @@ bool tp_init() { DBUG_RETURN(false); } -void tp_end() { - DBUG_ENTER("tp_end"); +void tp_end_thread() { + if (!threadpool_started) { + return; + } - if (!threadpool_started) DBUG_VOID_RETURN; + while (threadpool_thds > 0) { + sleep(2); + } stop_timer(&pool_timer); + for (uint i = 0; i < array_elements(all_groups); i++) { thread_group_close(&all_groups[i]); } + threadpool_started = false; +} + +void tp_end() { + DBUG_ENTER("tp_end"); + std::thread exit_tp(tp_end_thread); + exit_tp.detach(); DBUG_VOID_RETURN; } /** Ensure that poll descriptors are created when threadpool_size changes */ - void tp_set_threadpool_size(uint size) noexcept { if (!threadpool_started) return; @@ -1487,7 +1552,10 @@ void tp_set_threadpool_size(uint size) noexcept { } void tp_set_threadpool_stall_limit(uint limit) noexcept { - if (!threadpool_started) return; + if (!threadpool_started) { + return; + } + mysql_mutex_lock(&(pool_timer.mutex)); pool_timer.tick_interval = limit; mysql_mutex_unlock(&(pool_timer.mutex)); @@ -1500,7 +1568,6 @@ void tp_set_threadpool_stall_limit(uint limit) noexcept { Sum idle threads over all groups. Don't do any locking, it is not required for stats. */ - int tp_get_idle_thread_count() noexcept { int sum = 0; for (uint i = 0; @@ -1536,10 +1603,9 @@ A likely cause of pool blocks are clients that lock resources for long time. \ It will be just a single message for each blocking situation (to prevent log flood). */ - static void print_pool_blocked_message(bool max_threads_reached) noexcept { ulonglong now = my_microsecond_getsystime(); - static bool msg_written; + static bool msg_written = false; if (pool_block_start == 0) { pool_block_start = now; @@ -1561,4 +1627,3 @@ static void print_pool_blocked_message(bool max_threads_reached) noexcept { msg_written = true; } } - diff --git a/plugin/thread_pool/threadpool_unix.h b/plugin/thread_pool/threadpool_unix.h index 56cceae68d9b..e9a27baa04a9 100644 --- a/plugin/thread_pool/threadpool_unix.h +++ b/plugin/thread_pool/threadpool_unix.h @@ -69,9 +69,6 @@ struct connection_t { bool bound_to_poll_descriptor; bool waiting; uint tickets; - - uint threadpool_high_prio_tickets; - ulong threadpool_high_prio_mode; }; typedef I_P_List Date: Mon, 18 Apr 2022 20:31:24 +0800 Subject: [PATCH 2/2] Review opinion accepted. --- plugin/thread_pool/threadpool_unix.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/plugin/thread_pool/threadpool_unix.cc b/plugin/thread_pool/threadpool_unix.cc index 8501da336869..f1c575c729a5 100644 --- a/plugin/thread_pool/threadpool_unix.cc +++ b/plugin/thread_pool/threadpool_unix.cc @@ -1389,7 +1389,6 @@ static void handle_event(connection_t *connection) { set_wait_timeout(connection); if (connection->thd->is_admin_connection()) { - } else { break; } }