diff --git a/lib/puma/thread_pool.rb b/lib/puma/thread_pool.rb index 1a6c10e0..bf5635df 100644 --- a/lib/puma/thread_pool.rb +++ b/lib/puma/thread_pool.rb @@ -268,58 +268,20 @@ def <<(work) end end - # This method is used by `Puma::Server` to let the server know when - # the thread pool can pull more requests from the socket and - # pass to the reactor. + # This method is used by `Puma::Server` to let the server know when the + # server can pull (accept) more requests from the listener socket and + # process them. # - # The general idea is that the thread pool can only work on a fixed - # number of requests at the same time. If it is already processing that - # number of requests then it is at capacity. If another Puma process has - # spare capacity, then the request can be left on the socket so the other - # worker can pick it up and process it. + # The wait time delay is based on how busy the thread pool is, accounting + # for both 'in-process' requests and pending (@todo array) requests. The + # delay will be zero if all threads are not busy and there are no pending + # requests. # - # For example: if there are 5 threads, but only 4 working on - # requests, this method will not wait and the `Puma::Server` - # can pull a request right away. - # - # If there are 5 threads and all 5 of them are busy, then it will - # pause here, and wait until the `not_full` condition variable is - # signaled, usually this indicates that a request has been processed. - # - # It's important to note that even though the server might accept another - # request, it might not be added to the `@todo` array right away. - # For example if a slow client has only sent a header, but not a body - # then the `@todo` array would stay the same size as the reactor works - # to try to buffer the request. In that scenario the next call to this - # method would not block and another request would be added into the reactor - # by the server. This would continue until a fully buffered request - # makes it through the reactor and can then be processed by the thread pool. def wait_until_not_full return if @shutdown sleep 0.005 * busy_threads/@max.to_f end - # @version 5.0.0 - def wait_for_less_busy_worker(delay_s) - return unless delay_s && delay_s > 0 - - # Ruby MRI does GVL, this can result - # in processing contention when multiple threads - # (requests) are running concurrently - return unless Puma.mri? - - with_mutex do - return if @shutdown - - # do not delay, if we are not busy - return unless busy_threads > 0 - - # this will be signaled once a request finishes, - # which can happen earlier than delay - @not_full.wait @mutex, delay_s - end - end - # If there are any free threads in the pool, tell one to go ahead # and exit. If +force+ is true, then a trim request is requested # even if all threads are being utilized. diff --git a/test/test_busy_worker.rb b/test/test_busy_worker.rb deleted file mode 100644 index fc4f3f81..00000000 --- a/test/test_busy_worker.rb +++ /dev/null @@ -1,110 +0,0 @@ -require_relative "helper" -require_relative "helpers/test_puma/puma_socket" - -class TestBusyWorker < Minitest::Test - - include ::TestPuma::PumaSocket - - def setup - skip_unless :mri # This feature only makes sense on MRI - @server = nil - end - - def teardown - return if skipped? - @server&.stop true - end - - def with_server(**options, &app) - @requests_count = 0 # number of requests processed - @requests_running = 0 # current number of requests running - @requests_max_running = 0 # max number of requests running in parallel - @mutex = Mutex.new - - request_handler = ->(env) do - @mutex.synchronize do - @requests_count += 1 - @requests_running += 1 - if @requests_running > @requests_max_running - @requests_max_running = @requests_running - end - end - - begin - yield(env) - ensure - @mutex.synchronize do - @requests_running -= 1 - end - end - end - - options[:min_threads] ||= 1 - options[:max_threads] ||= 10 - options[:log_writer] ||= Puma::LogWriter.strings - - @server = Puma::Server.new request_handler, nil, **options - @bind_port = (@server.add_tcp_listener '127.0.0.1', 0).addr[1] - @server.run - end - - # Multiple concurrent requests are not processed - # sequentially as a small delay is introduced - def test_multiple_requests_waiting_on_less_busy_worker - with_server(wait_for_less_busy_worker: 1.0, workers: 2) do |_| - sleep(0.1) - - [200, {}, [""]] - end - - n = 2 - - sockets = send_http_array GET_10, n - - read_response_array(sockets) - - assert_equal n, @requests_count, "number of requests needs to match" - assert_equal 0, @requests_running, "none of requests needs to be running" - assert_equal 1, @requests_max_running, "maximum number of concurrent requests needs to be 1" - end - - # Multiple concurrent requests are processed - # in parallel as a delay is disabled - def test_multiple_requests_processing_in_parallel - with_server(wait_for_less_busy_worker: 0.0, workers: 2) do |_| - sleep(0.1) - - [200, {}, [""]] - end - - n = 4 - - sockets = send_http_array GET_10, n - - read_response_array(sockets) - - assert_equal n, @requests_count, "number of requests needs to match" - assert_equal 0, @requests_running, "none of requests needs to be running" - assert_equal n, @requests_max_running, "maximum number of concurrent requests needs to match" - end - - def test_not_wait_for_less_busy_worker - with_server do - [200, {}, [""]] - end - - assert_not_called_on_instance_of(Puma::ThreadPool, :wait_for_less_busy_worker) do - send_http_read_response "GET / HTTP/1.0\r\n\r\n" - end - end - - def test_wait_for_less_busy_worker - with_server(workers: 2) do - [200, {}, [""]] - end - - assert_called_on_instance_of(Puma::ThreadPool, :wait_for_less_busy_worker) do - send_http_read_response "GET / HTTP/1.0\r\n\r\n" - end - end -end