From 9ddfcff75ccf291ba7623d42dd6ffac1712a06fd Mon Sep 17 00:00:00 2001 From: YaacovHazan <31382944+YaacovHazan@users.noreply.github.com> Date: Thu, 16 Nov 2023 17:52:58 +0200 Subject: [PATCH] Add rate limiting to control the number of requests per second (#237) * Add rate limiting to control the number of requests per second A new 'rate-limiting' option was added to control the number of request per second. The rate limiting is based on the 'Token Bucket' algorithm, and according to the configured rate, on each interval, a "new" amount of requests allowed to be sent to the server. The rate-limiting is at the connection level. Therefore, in cluster mode, the limitation is for each shard, so if, for example, the cluster has three shards and the user configured one request per second. On every second, memtier-benchmark will send three requests, one for each shard. * Added tests to cover --rate-limiting option. Ensured the help message explains the per connection rate-limit * Fixed per cluster rps test limits --------- Co-authored-by: YaacovHazan Co-authored-by: filipecosta90 --- memtier_benchmark.cpp | 28 +++++++++++++++++++++++- memtier_benchmark.h | 3 +++ shard_connection.cpp | 39 +++++++++++++++++++++++++++++++--- shard_connection.h | 4 ++++ tests/include.py | 14 +++++++----- tests/tests_oss_simple_flow.py | 39 ++++++++++++++++++++++++++++++++++ 6 files changed, 118 insertions(+), 9 deletions(-) diff --git a/memtier_benchmark.cpp b/memtier_benchmark.cpp index f7e0da31..26e585cf 100755 --- a/memtier_benchmark.cpp +++ b/memtier_benchmark.cpp @@ -127,6 +127,7 @@ static void config_print(FILE *file, struct benchmark_config *cfg) "run_count = %u\n" "debug = %u\n" "requests = %llu\n" + "rate_limit = %u\n" "clients = %u\n" "threads = %u\n" "test_time = %u\n" @@ -176,6 +177,7 @@ static void config_print(FILE *file, struct benchmark_config *cfg) cfg->run_count, cfg->debug, cfg->requests, + cfg->request_rate, cfg->clients, cfg->threads, cfg->test_time, @@ -233,6 +235,7 @@ static void config_print_to_json(json_handler * jsonhandler, struct benchmark_co jsonhandler->write_obj("run_count" ,"%u", cfg->run_count); jsonhandler->write_obj("debug" ,"%u", cfg->debug); jsonhandler->write_obj("requests" ,"%llu", cfg->requests); + jsonhandler->write_obj("rate_limit" ,"%u", cfg->request_rate); jsonhandler->write_obj("clients" ,"%u", cfg->clients); jsonhandler->write_obj("threads" ,"%u", cfg->threads); jsonhandler->write_obj("test_time" ,"%u", cfg->test_time); @@ -421,6 +424,7 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf o_tls_sni, o_tls_protocols, o_hdr_file_prefix, + o_rate_limiting, o_help }; @@ -489,6 +493,7 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf { "command", 1, 0, o_command }, { "command-key-pattern", 1, 0, o_command_key_pattern }, { "command-ratio", 1, 0, o_command_ratio }, + { "rate-limiting", 1, 0, o_rate_limiting }, { NULL, 0, 0, 0 } }; @@ -861,6 +866,15 @@ static int config_parse_args(int argc, char *argv[], struct benchmark_config *cf } break; } + case o_rate_limiting: { + endptr = NULL; + cfg->request_rate = (unsigned int) strtoul(optarg, &endptr, 10); + if (!cfg->request_rate || !endptr || *endptr != '\0') { + fprintf(stderr, "error: rate must be greater than zero.\n"); + return -1; + } + break; + } #ifdef USE_TLS case o_tls: cfg->tls = true; @@ -948,7 +962,7 @@ void usage() { " --key=FILE Use specified private key for TLS\n" " --cacert=FILE Use specified CA certs bundle for TLS\n" " --tls-skip-verify Skip verification of server certificate\n" - " --tls-protocols Specify the tls protocol version to use, comma delemited. Use a combination of 'TLSv1', 'TLSv1.1', 'TLSv1.2' and 'TLSv1.3'" + " --tls-protocols Specify the tls protocol version to use, comma delemited. Use a combination of 'TLSv1', 'TLSv1.1', 'TLSv1.2' and 'TLSv1.3'.\n" " --sni=STRING Add an SNI header\n" #endif " -x, --run-count=NUMBER Number of full-test iterations to perform\n" @@ -967,6 +981,8 @@ void usage() { "Test Options:\n" " -n, --requests=NUMBER Number of total requests per client (default: 10000)\n" " use 'allkeys' to run on the entire key-range\n" + " --rate-limiting=NUMBER The max number of requests to make per second from an individual connection (default is unlimited rate).\n" + " If you use --rate-limiting and a very large rate is entered which cannot be met, memtier will do as many requests as possible per second.\n" " -c, --clients=NUMBER Number of clients per thread (default: 50)\n" " -t, --threads=NUMBER Number of threads (default: 4)\n" " --test-time=SECS Number of seconds to run the test\n" @@ -1348,6 +1364,16 @@ int main(int argc, char *argv[]) delete tmp_protocol; } + // if user configured rate limiting, do some calculations + if (cfg.request_rate) { + /* Our event resolution is (at least) 50 events per second (event every >= 20 ml). + * When we calculate the number of request per interval, we are taking + * the upper bound and adjust the interval accordingly to get more accuracy */ + cfg.request_per_interval = (cfg.request_rate + 50 - 1) / 50; + unsigned int events_per_second = cfg.request_rate / cfg.request_per_interval; + cfg.request_interval_microsecond = 1000000 / events_per_second; + benchmark_debug_log("Rate limiting configured to send %u requests per %u millisecond\n", cfg.request_per_interval, cfg.request_interval_microsecond / 1000); + } #ifdef USE_TLS // Initialize OpenSSL only if we're really going to use it. diff --git a/memtier_benchmark.h b/memtier_benchmark.h index d3e7e697..b7a71903 100644 --- a/memtier_benchmark.h +++ b/memtier_benchmark.h @@ -104,6 +104,9 @@ struct benchmark_config { bool cluster_mode; struct arbitrary_command_list* arbitrary_commands; const char *hdr_prefix; + unsigned int request_rate; + unsigned int request_per_interval; + unsigned int request_interval_microsecond; #ifdef USE_TLS bool tls; const char *tls_cert; diff --git a/shard_connection.cpp b/shard_connection.cpp index bcf331ad..89dd9865 100644 --- a/shard_connection.cpp +++ b/shard_connection.cpp @@ -56,6 +56,13 @@ #include "event2/bufferevent_ssl.h" #endif +void cluster_client_timer_handler(evutil_socket_t fd, short what, void *ctx) +{ + shard_connection *sc = (shard_connection *) ctx; + assert(sc != NULL); + sc->handle_timer_event(); +} + void cluster_client_read_handler(bufferevent *bev, void *ctx) { shard_connection *sc = (shard_connection *) ctx; @@ -66,7 +73,6 @@ void cluster_client_read_handler(bufferevent *bev, void *ctx) void cluster_client_event_handler(bufferevent *bev, short events, void *ctx) { shard_connection *sc = (shard_connection *) ctx; - assert(sc != NULL); sc->handle_event(events); } @@ -123,7 +129,7 @@ verify_request::~verify_request(void) shard_connection::shard_connection(unsigned int id, connections_manager* conns_man, benchmark_config* config, struct event_base* event_base, abstract_protocol* abs_protocol) : m_address(NULL), m_port(NULL), m_unix_sockaddr(NULL), - m_bev(NULL), m_pending_resp(0), m_connection_state(conn_disconnected), + m_bev(NULL), m_request_per_cur_interval(0), m_pending_resp(0), m_connection_state(conn_disconnected), m_hello(setup_done), m_authentication(setup_done), m_db_selection(setup_done), m_cluster_slots(setup_done) { m_id = id; m_conns_manager = conns_man; @@ -341,6 +347,10 @@ request* shard_connection::pop_req() { void shard_connection::push_req(request* req) { m_pipeline->push(req); m_pending_resp++; + if (m_config->request_rate) { + assert(m_request_per_cur_interval > 0); + m_request_per_cur_interval--; + } } bool shard_connection::is_conn_setup_done() { @@ -486,21 +496,28 @@ void shard_connection::process_first_request() { fill_pipeline(); } - void shard_connection::fill_pipeline(void) { struct timeval now; gettimeofday(&now, NULL); + while (!m_conns_manager->finished() && m_pipeline->size() < m_config->pipeline) { if (!is_conn_setup_done()) { send_conn_setup_commands(now); return; } + // don't exceed requests if (m_conns_manager->hold_pipeline(m_id)) { break; } + // that's enough, we reached the rate limit + if (m_config->request_rate && m_request_per_cur_interval == 0) { + // return and skip on update events + return; + } + // client manage requests logic m_conns_manager->create_request(now, m_id); } @@ -511,6 +528,9 @@ void shard_connection::fill_pipeline(void) if ((m_pending_resp == 0) && (evbuffer_get_length(bufferevent_get_output(m_bev)) == 0)) { benchmark_debug_log("%s Done, no requests to send no response to wait for\n", get_readable_id()); bufferevent_disable(m_bev, EV_WRITE|EV_READ); + if (m_config->request_rate) { + event_del(m_event_timer); + } } } } @@ -526,6 +546,14 @@ void shard_connection::handle_event(short events) bufferevent_enable(m_bev, EV_READ|EV_WRITE); if (!m_conns_manager->get_reqs_processed()) { + /* Set timer for request rate */ + if (m_config->request_rate) { + struct timeval interval = { 0, (long int)m_config->request_interval_microsecond }; + m_request_per_cur_interval = m_config->request_per_interval; + m_event_timer = event_new(m_event_base, -1, EV_PERSIST, cluster_client_timer_handler, (void *)this); + event_add(m_event_timer, &interval); + } + process_first_request(); } else { benchmark_debug_log("reconnection complete, proceeding with test\n"); @@ -561,6 +589,11 @@ void shard_connection::handle_event(short events) } } +void shard_connection::handle_timer_event() { + m_request_per_cur_interval = m_config->request_per_interval; + fill_pipeline(); +} + void shard_connection::send_wait_command(struct timeval* sent_time, unsigned int num_slaves, unsigned int timeout) { int cmd_size = 0; diff --git a/shard_connection.h b/shard_connection.h index 9a06f735..90a2cb41 100644 --- a/shard_connection.h +++ b/shard_connection.h @@ -76,6 +76,7 @@ struct verify_request : public request { }; class shard_connection { + friend void cluster_client_timer_handler(evutil_socket_t fd, short what, void *ctx); friend void cluster_client_read_handler(bufferevent *bev, void *ctx); friend void cluster_client_event_handler(bufferevent *bev, short events, void *ctx); @@ -148,6 +149,7 @@ class shard_connection { void fill_pipeline(void); void handle_event(short evtype); + void handle_timer_event(); unsigned int m_id; connections_manager* m_conns_manager; @@ -160,9 +162,11 @@ class shard_connection { struct sockaddr_un* m_unix_sockaddr; struct bufferevent *m_bev; struct event_base* m_event_base; + struct event* m_event_timer; abstract_protocol* m_protocol; std::queue* m_pipeline; + unsigned int m_request_per_cur_interval; // number requests to send during the current interval int m_pending_resp; diff --git a/tests/include.py b/tests/include.py index 9cfb4c55..dd268883 100644 --- a/tests/include.py +++ b/tests/include.py @@ -16,7 +16,7 @@ def ensure_tls_protocols(master_nodes_connections): def assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_request_count, - overall_request_count): + overall_request_count, overall_request_delta=None): failed_asserts = env.getNumberOfFailedAssertion() try: # assert correct exit code @@ -25,8 +25,11 @@ def assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_re env.assertTrue(os.path.isfile('{0}/mb.stdout'.format(config.results_dir))) env.assertTrue(os.path.isfile('{0}/mb.stderr'.format(config.results_dir))) env.assertTrue(os.path.isfile('{0}/mb.json'.format(config.results_dir))) - # assert we have the expected request count - env.assertEqual(overall_expected_request_count, overall_request_count) + if overall_request_delta is None: + # assert we have the expected request count + env.assertEqual(overall_expected_request_count, overall_request_count) + else: + env.assertAlmostEqual(overall_expected_request_count, overall_request_count,overall_request_delta) finally: if env.getNumberOfFailedAssertion() > failed_asserts: debugPrintMemtierOnError(config, env) @@ -108,13 +111,14 @@ def addTLSArgs(benchmark_specs, env): -def get_default_memtier_config(threads=10, clients=5, requests=1000): +def get_default_memtier_config(threads=10, clients=5, requests=1000, test_time=None): config = { "memtier_benchmark": { "binary": MEMTIER_BINARY, "threads": threads, "clients": clients, - "requests": requests + "requests": requests, + "test_time": test_time }, } return config diff --git a/tests/tests_oss_simple_flow.py b/tests/tests_oss_simple_flow.py index 30541869..f0743acd 100644 --- a/tests/tests_oss_simple_flow.py +++ b/tests/tests_oss_simple_flow.py @@ -378,3 +378,42 @@ def test_default_arbitrary_command_hset_multi_data_placeholders(env): overall_request_count = agg_info_commandstats(master_nodes_connections, merged_command_stats) assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_request_count, overall_request_count) + +def test_default_set_get_rate_limited(env): + master_nodes_list = env.getMasterNodesList() + for client_count in [1,2,4]: + for thread_count in [1,2]: + rps_per_client = 100 + test_time_secs = 5 + overall_expected_rps = rps_per_client * client_count * thread_count * len(master_nodes_list) + overall_expected_request_count = test_time_secs * overall_expected_rps + # we give a 1 sec margin + request_delta = overall_expected_rps + # we will specify rate limit and the test time, which should help us get an approximate request count + benchmark_specs = {"name": env.testName, "args": ['--rate-limiting={}'.format(rps_per_client)]} + addTLSArgs(benchmark_specs, env) + config = get_default_memtier_config(thread_count,client_count,None,test_time_secs) + + master_nodes_connections = env.getOSSMasterNodesConnectionList() + + # reset the commandstats + for master_connection in master_nodes_connections: + master_connection.execute_command("CONFIG", "RESETSTAT") + + add_required_env_arguments(benchmark_specs, config, env, master_nodes_list) + + # Create a temporary directory + test_dir = tempfile.mkdtemp() + + config = RunConfig(test_dir, env.testName, config, {}) + ensure_clean_benchmark_folder(config.results_dir) + + benchmark = Benchmark.from_json(config, benchmark_specs) + + # benchmark.run() returns True if the return code of memtier_benchmark was 0 + memtier_ok = benchmark.run() + + master_nodes_connections = env.getOSSMasterNodesConnectionList() + merged_command_stats = {'cmdstat_set': {'calls': 0}, 'cmdstat_get': {'calls': 0}} + overall_request_count = agg_info_commandstats(master_nodes_connections, merged_command_stats) + assert_minimum_memtier_outcomes(config, env, memtier_ok, overall_expected_request_count, overall_request_count, request_delta)