From 91a9bb978bcddd5889c5dd12bdde3a54dd85c506 Mon Sep 17 00:00:00 2001 From: Michael Grosser Date: Wed, 8 Nov 2017 20:33:42 -0800 Subject: [PATCH] document that watcher stops see #273 and add each_with_retry --- README.md | 6 +++++ lib/kubeclient/watch_stream.rb | 12 ++++++++- test/test_watch.rb | 48 +++++++++++++++++++++++++++------- 3 files changed, 56 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index b804e40c..4a392c00 100644 --- a/README.md +++ b/README.md @@ -382,6 +382,12 @@ watcher.each do |notice| end ``` +The `.each` will stop when the connection is interrupted, use `each_with_retry` if you want to continue: + +```ruby +client.watch_pods.each_with_retry { |notice| ... } +``` + It is possible to interrupt the watcher from another thread with: ```ruby diff --git a/lib/kubeclient/watch_stream.rb b/lib/kubeclient/watch_stream.rb index 4902f2aa..6eb89d04 100644 --- a/lib/kubeclient/watch_stream.rb +++ b/lib/kubeclient/watch_stream.rb @@ -13,7 +13,6 @@ def initialize(uri, http_options, format: :json) def each @finished = false - @http_client = build_client response = @http_client.request(:get, @uri, build_client_options) unless response.code < 300 @@ -21,16 +20,27 @@ def each end buffer = '' + done = false response.body.each do |chunk| + done = false buffer << chunk while (line = buffer.slice!(/.+\n/)) yield(@format == :json ? WatchNotice.new(JSON.parse(line)) : line.chomp) end + done = true end + done rescue IOError raise unless @finished end + def each_with_retry(&block) + loop do + done = each(&block) + break if !done || @finished + end + end + def finish @finished = true @http_client.close unless @http_client.nil? diff --git a/test/test_watch.rb b/test/test_watch.rb index 37ca615f..270d32f8 100644 --- a/test/test_watch.rb +++ b/test/test_watch.rb @@ -9,15 +9,7 @@ def test_watch_pod_success { 'type' => 'DELETED', 'resourceVersion' => '1398' } ] - stub_request(:get, %r{/api/v1$}) - .to_return(body: open_test_file('core_api_resource_list.json'), - status: 200) - - stub_request(:get, %r{.*\/watch/pods}) - .to_return(body: open_test_file('watch_stream.json'), - status: 200) - - client = Kubeclient::Client.new('http://localhost:8080/api/', 'v1') + client = setup_regular_response client.watch_pods.to_enum.with_index do |notice, index| assert_instance_of(Kubeclient::Common::WatchNotice, notice) @@ -116,4 +108,42 @@ def test_watch_with_field_selector "#{api_host}/v1/watch/events?fieldSelector=#{selector}", times: 1) end + + def test_watch_with_retry + Timeout.timeout 1 do + events = 0 + setup_regular_response.watch_pods.each_with_retry do + events += 1 + break if events == 10 + end + assert_equal 10, events + end + end + + def test_watch_with_retry_and_finish + Timeout.timeout 1 do + events = 0 + watcher = setup_regular_response.watch_pods + watcher.each_with_retry do + watcher.finish + events += 1 + break if events == 10 + end + assert_equal 3, events + end + end + + private + + def setup_regular_response + stub_request(:get, %r{/api/v1$}) + .to_return(body: open_test_file('core_api_resource_list.json'), + status: 200) + + stub_request(:get, %r{.*\/watch/pods}) + .to_return(body: open_test_file('watch_stream.json'), + status: 200) + + Kubeclient::Client.new('http://localhost:8080/api/', 'v1') + end end