Skip to content

Commit

Permalink
Merge pull request #257 from Winston-leon/mysql-8.0.25-thread-pool
Browse files Browse the repository at this point in the history
* Provide threadpool plugin for mysql-8.0.25
  • Loading branch information
liuwei-ck authored Apr 18, 2022
2 parents 91f5751 + 89efec4 commit 1c0209b
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 173 deletions.
5 changes: 3 additions & 2 deletions plugin/thread_pool/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ extern uint threadpool_max_threads;
extern uint threadpool_stall_limit; /* time interval in 10 ms units for stall checks*/
extern uint threadpool_oversubscribe; /* Maximum active threads in group */
extern uint threadpool_toobusy; /* Maximum active and waiting threads in group */
extern uint threadpool_high_prio_tickets;
extern ulong threadpool_high_prio_mode;

/* Possible values for thread_pool_high_prio_mode */
extern const char *threadpool_high_prio_mode_names[];
Expand Down Expand Up @@ -81,5 +79,8 @@ extern TP_STATISTICS tp_stats;
extern void tp_set_threadpool_size(uint val) noexcept;
extern void tp_set_threadpool_stall_limit(uint val) noexcept;

extern uint tp_get_thdvar_high_prio_tickets(THD *thd);
extern uint tp_get_thdvar_high_prio_mode(THD *thd);

#endif // THREADPOOL_H_

120 changes: 63 additions & 57 deletions plugin/thread_pool/threadpool_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */



#include "threadpool.h"
#include "threadpool_unix.h"
#include "my_thread_local.h"
#include "my_sys.h"
#include "mysql/plugin.h"
#include "mysql/psi/mysql_idle.h"
#include "mysql/thread_pool_priv.h"
#include "sql/debug_sync.h"
Expand All @@ -27,15 +27,12 @@
#include "sql/sql_connect.h"
#include "sql/protocol_classic.h"
#include "sql/sql_parse.h"
#include "mysql/plugin.h"
#include "sql/sql_table.h"
#include "sql/field.h"
#include "sql/sql_show.h"
#include "sql/sql_class.h"
#include "my_sys.h"
#include "mysql/plugin.h"
#include <dlfcn.h>

#include <memory>

#define MYSQL_SERVER 1

Expand All @@ -46,8 +43,6 @@ uint threadpool_stall_limit;
uint threadpool_max_threads;
uint threadpool_oversubscribe;
uint threadpool_toobusy;
uint threadpool_high_prio_tickets;
ulong threadpool_high_prio_mode;

/* Stats */
TP_STATISTICS tp_stats;
Expand Down Expand Up @@ -79,7 +74,7 @@ class Worker_thread_context {
#ifdef HAVE_PSI_THREAD_INTERFACE
PSI_thread *const psi_thread;
#endif
#ifndef DBUG_OFF
#ifndef NDEBUG
const my_thread_id thread_id;
#endif
public:
Expand All @@ -88,7 +83,7 @@ class Worker_thread_context {
#ifdef HAVE_PSI_THREAD_INTERFACE
psi_thread(PSI_THREAD_CALL(get_thread)())
#endif
#ifndef DBUG_OFF
#ifndef NDEBUG
,
thread_id(my_thread_var_id())
#endif
Expand All @@ -99,7 +94,7 @@ class Worker_thread_context {
#ifdef HAVE_PSI_THREAD_INTERFACE
PSI_THREAD_CALL(set_thread)(psi_thread);
#endif
#ifndef DBUG_OFF
#ifndef NDEBUG
set_my_thread_var_id(thread_id);
#endif
THR_MALLOC = nullptr;
Expand All @@ -110,7 +105,7 @@ class Worker_thread_context {
Attach/associate the connection with the OS thread,
*/
static bool thread_attach(THD *thd) {
#ifndef DBUG_OFF
#ifndef NDEBUG
set_my_thread_var_id(thd->thread_id());
#endif
thd->thread_stack = (char *)&thd;
Expand Down Expand Up @@ -171,7 +166,6 @@ int threadpool_add_connection(THD *thd) {
if (thd_connection_alive(thd)) {
retval = 0;
thd_set_net_read_write(thd, 1);
//thd->skip_wait_timeout = true; // !! todo
MYSQL_SOCKET_SET_STATE(thd->get_protocol_classic()->get_vio()->mysql_socket,
PSI_SOCKET_STATE_IDLE);
thd->m_server_idle = true;
Expand Down Expand Up @@ -305,65 +299,38 @@ static inline int my_getncpus() noexcept {
#endif
}

static MYSQL_SYSVAR_UINT(threadpool_idle_timeout, threadpool_idle_timeout,
static MYSQL_SYSVAR_UINT(idle_timeout, threadpool_idle_timeout,
PLUGIN_VAR_RQCMDARG,
"Timeout in seconds for an idle thread in the thread pool."
"Worker thread will be shut down after timeout",
NULL, NULL, 60, 1, UINT_MAX, 1);

static MYSQL_SYSVAR_UINT(threadpool_oversubscribe, threadpool_oversubscribe,
static MYSQL_SYSVAR_UINT(oversubscribe, threadpool_oversubscribe,
PLUGIN_VAR_RQCMDARG,
"How many additional active worker threads in a group are allowed.",
NULL, NULL, 3, 1, 1000, 1);

static MYSQL_SYSVAR_UINT(threadpool_toobusy, threadpool_toobusy,
static MYSQL_SYSVAR_UINT(toobusy, threadpool_toobusy,
PLUGIN_VAR_RQCMDARG,
"How many additional active and waiting worker threads in a group are allowed.",
NULL, NULL, 13, 1, 1000, 1);

static MYSQL_SYSVAR_UINT(threadpool_size, threadpool_size,
static MYSQL_SYSVAR_UINT(size, threadpool_size,
PLUGIN_VAR_RQCMDARG,
"Number of thread groups in the pool. "
"This parameter is roughly equivalent to maximum number of concurrently "
"executing threads (threads in a waiting state do not count as executing).",
NULL, fix_threadpool_size, (uint)my_getncpus(), 1, MAX_THREAD_GROUPS, 1);

static MYSQL_SYSVAR_UINT(threadpool_stall_limit, threadpool_stall_limit,
static MYSQL_SYSVAR_UINT(stall_limit, threadpool_stall_limit,
PLUGIN_VAR_RQCMDARG,
"Maximum query execution time in milliseconds,"
"before an executing non-yielding thread is considered stalled."
"If a worker thread is stalled, additional worker thread "
"may be created to handle remaining clients.",
NULL, fix_threadpool_stall_limit, 500, 10, UINT_MAX, 1);

static MYSQL_SYSVAR_UINT(threadpool_high_prio_tickets, threadpool_high_prio_tickets,
PLUGIN_VAR_RQCMDARG,
"Number of tickets to enter the high priority event queue for each "
"transaction.",
NULL, NULL , UINT_MAX, 0, UINT_MAX, 1);

const char *threadpool_high_prio_mode_names[] = {"transactions", "statements",
"none", NullS};
TYPELIB threadpool_high_prio_mode_typelib =
{
array_elements(threadpool_high_prio_mode_names) - 1, "threadpool_high_prio_mode",
threadpool_high_prio_mode_names, NULL
};

static MYSQL_SYSVAR_ENUM(threadpool_high_prio_mode, threadpool_high_prio_mode,
PLUGIN_VAR_RQCMDARG,
"High priority queue mode: one of 'transactions', 'statements' or 'none'. "
"In the 'transactions' mode the thread pool uses both high- and low-priority "
"queues depending on whether an event is generated by an already started "
"transaction and whether it has any high priority tickets (see "
"thread_pool_high_prio_tickets). In the 'statements' mode all events (i.e. "
"individual statements) always go to the high priority queue, regardless of "
"the current transaction state and high priority tickets. "
"'none' is the opposite of 'statements', i.e. disables the high priority queue "
"completely.",
NULL, NULL, TP_HIGH_PRIO_MODE_TRANSACTIONS, &threadpool_high_prio_mode_typelib);

static MYSQL_SYSVAR_UINT(threadpool_max_threads, threadpool_max_threads,
static MYSQL_SYSVAR_UINT(max_threads, threadpool_max_threads,
PLUGIN_VAR_RQCMDARG,
"Maximum allowed number of worker threads in the thread pool",
NULL, NULL, MAX_CONNECTIONS, 1, MAX_CONNECTIONS, 1);
Expand All @@ -377,24 +344,55 @@ static int threadpool_plugin_init(void *)
DBUG_RETURN(0);
}


static int threadpool_plugin_deinit(void *)
{
DBUG_ENTER("threadpool_plugin_deinit");

my_connection_handler_reset();
DBUG_RETURN(0);
}

static struct SYS_VAR* system_variables[] = {
MYSQL_SYSVAR(threadpool_idle_timeout),
MYSQL_SYSVAR(threadpool_size),
MYSQL_SYSVAR(threadpool_max_threads),
MYSQL_SYSVAR(threadpool_stall_limit),
MYSQL_SYSVAR(threadpool_oversubscribe),
MYSQL_SYSVAR(threadpool_toobusy),
MYSQL_SYSVAR(threadpool_high_prio_tickets),
MYSQL_SYSVAR(threadpool_high_prio_mode),
static MYSQL_THDVAR_UINT(high_prio_tickets,
PLUGIN_VAR_NOCMDARG,
"Number of tickets to enter the high priority event queue for each "
"transaction.",
NULL, NULL, UINT_MAX, 0, UINT_MAX, 1);

const char *threadpool_high_prio_mode_names[] = {"transactions", "statements",
"none", NullS};
TYPELIB threadpool_high_prio_mode_typelib = {
array_elements(threadpool_high_prio_mode_names) - 1, "",
threadpool_high_prio_mode_names, NULL
};

static MYSQL_THDVAR_ENUM(high_prio_mode,
PLUGIN_VAR_NOCMDARG,
"High priority queue mode: one of 'transactions', 'statements' or 'none'. "
"In the 'transactions' mode the thread pool uses both high- and low-priority "
"queues depending on whether an event is generated by an already started "
"transaction and whether it has any high priority tickets (see "
"thread_pool_high_prio_tickets). In the 'statements' mode all events (i.e. "
"individual statements) always go to the high priority queue, regardless of "
"the current transaction state and high priority tickets. "
"'none' is the opposite of 'statements', i.e. disables the high priority queue "
"completely.",
NULL, NULL, TP_HIGH_PRIO_MODE_TRANSACTIONS, &threadpool_high_prio_mode_typelib);

static uint &idle_timeout = threadpool_idle_timeout;
static uint &size = threadpool_size;
static uint &stall_limit = threadpool_stall_limit;
static uint &max_threads = threadpool_max_threads;
static uint &oversubscribe = threadpool_oversubscribe;
static uint &toobusy = threadpool_toobusy;

SYS_VAR *system_variables[] = {
MYSQL_SYSVAR(idle_timeout),
MYSQL_SYSVAR(size),
MYSQL_SYSVAR(max_threads),
MYSQL_SYSVAR(stall_limit),
MYSQL_SYSVAR(oversubscribe),
MYSQL_SYSVAR(toobusy),
MYSQL_SYSVAR(high_prio_tickets),
MYSQL_SYSVAR(high_prio_mode),
NULL
};

Expand Down Expand Up @@ -741,3 +739,11 @@ mysql_declare_plugin(thread_pool)
}
mysql_declare_plugin_end;

uint tp_get_thdvar_high_prio_tickets(THD *thd) {
return THDVAR(thd, high_prio_tickets);
}

uint tp_get_thdvar_high_prio_mode(THD *thd) {
return THDVAR(thd, high_prio_mode);
}

Loading

0 comments on commit 1c0209b

Please sign in to comment.