From 94d08e12ba17155b6e55ecb4421b2ccbd7e43702 Mon Sep 17 00:00:00 2001 From: Sam Vevang Date: Wed, 30 Oct 2024 11:06:02 -0500 Subject: [PATCH 1/3] Retry on error_apple non-terminal error --- app/models/publishing_pipeline_state.rb | 12 ++++- app/models/publishing_queue_item.rb | 2 +- test/models/publishing_pipeline_state_test.rb | 47 +++++++++++++++++++ test/models/publishing_queue_item_test.rb | 14 ++++++ 4 files changed, 72 insertions(+), 3 deletions(-) diff --git a/app/models/publishing_pipeline_state.rb b/app/models/publishing_pipeline_state.rb index c6b82c0fe..7c8361485 100644 --- a/app/models/publishing_pipeline_state.rb +++ b/app/models/publishing_pipeline_state.rb @@ -1,6 +1,6 @@ class PublishingPipelineState < ApplicationRecord TERMINAL_STATUSES = [:complete, :error, :expired].freeze - TERMINAL_FAILURE_STATUSES = [:error, :expired].freeze + FAILURE_STATUSES = [:error, :expired, :error_apple].freeze UNIQUE_STATUSES = TERMINAL_STATUSES + [:created, :started] # Handle the max timout for a publishing pipeline: Pub RSS job + Pub Apple job + a few extra minutes of flight @@ -173,8 +173,16 @@ def self.expire_pipelines! end end + def self.latest_failed_publishing_queue_items + PublishingQueueItem.where(id: latest_failed_pipelines.select(:publishing_queue_item_id).distinct) + end + + def self.latest_failed_podcasts + Podcast.where(id: latest_failed_publishing_queue_items.select(:podcast_id).distinct) + end + def self.retry_failed_pipelines! - Podcast.where(id: latest_failed_pipelines.select(:podcast_id).distinct).each do |podcast| + latest_failed_podcasts.each do |podcast| Rails.logger.tagged("PublishingPipeLineState.retry_failed_pipelines!", "Podcast:#{podcast.id}") do start_pipeline!(podcast) end diff --git a/app/models/publishing_queue_item.rb b/app/models/publishing_queue_item.rb index c4a408771..25fb142da 100644 --- a/app/models/publishing_queue_item.rb +++ b/app/models/publishing_queue_item.rb @@ -8,7 +8,7 @@ class PublishingQueueItem < ApplicationRecord latest_by_status(PublishingPipelineState::TERMINAL_STATUSES) } scope :latest_failed, -> { - latest_by_status(PublishingPipelineState::TERMINAL_FAILURE_STATUSES) + latest_by_status(PublishingPipelineState::FAILURE_STATUSES) } scope :latest_by_status, ->(status) { diff --git a/test/models/publishing_pipeline_state_test.rb b/test/models/publishing_pipeline_state_test.rb index 4f3803f0c..73c14d4e1 100644 --- a/test/models/publishing_pipeline_state_test.rb +++ b/test/models/publishing_pipeline_state_test.rb @@ -186,6 +186,37 @@ end end + describe ".latest_failed_pipelines" do + it "returns the latest failed pipelines including intermediate and terminal errors" do + # Create a publishing queue item and associated pipeline state + pqi1 = PublishingQueueItem.ensure_queued!(podcast) + _s1 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: pqi1) + PublishingPipelineState.error_apple!(podcast) + PublishingPipelineState.complete!(podcast) + + # Verify that the intermediate error is included in the latest failed pipelines + assert_equal [podcast], PublishingPipelineState.latest_failed_podcasts + assert_equal ["created", "error_apple", "complete"], PublishingPipelineState.latest_failed_pipelines.where(podcast: podcast).map(&:status) + + # Create another publishing queue item and associated pipeline state + pqi2 = PublishingQueueItem.ensure_queued!(podcast) + _s2 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: pqi2) + PublishingPipelineState.error!(podcast) + + # Verify that the terminal error is included in the latest failed pipelines + assert_equal [podcast], PublishingPipelineState.latest_failed_podcasts + assert_equal ["created", "error"], PublishingPipelineState.latest_failed_pipelines.where(podcast: podcast).map(&:status) + + # Verify that a successful pipeline is not included in the latest failed pipelines + pqi3 = PublishingQueueItem.ensure_queued!(podcast) + _s3 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: pqi3) + PublishingPipelineState.complete!(podcast) + + assert_equal [].sort, PublishingPipelineState.latest_failed_pipelines.where(podcast: podcast) + assert ["created", "complete"], PublishingPipelineState.latest_pipelines.where(podcast: podcast).pluck(:status) + end + end + describe ".retry_failed_pipelines!" do it "should retry failed pipelines" do PublishingPipelineState.start_pipeline!(podcast) @@ -200,6 +231,22 @@ assert_equal ["created"].sort, PublishingPipelineState.latest_pipeline(podcast).map(&:status).sort end + it "retries pipelines with intermediate error_apple and non-error terminal status" do + PublishingPipelineState.start_pipeline!(podcast) + assert_equal ["created"], PublishingPipelineState.latest_pipeline(podcast).map(&:status) + + # it fails + PublishingPipelineState.error_apple!(podcast) + assert_equal ["created", "error_apple"].sort, PublishingPipelineState.latest_pipeline(podcast).map(&:status).sort + + PublishingPipelineState.complete!(podcast) + assert_equal ["created", "error_apple", "complete"].sort, PublishingPipelineState.latest_pipeline(podcast).map(&:status).sort + + # it retries + PublishingPipelineState.retry_failed_pipelines! + assert_equal ["created"].sort, PublishingPipelineState.latest_pipeline(podcast).map(&:status).sort + end + it "ignores previously errored pipelines back in the queue" do # A failed pipeline PublishingPipelineState.start_pipeline!(podcast) diff --git a/test/models/publishing_queue_item_test.rb b/test/models/publishing_queue_item_test.rb index d9059956c..941890078 100644 --- a/test/models/publishing_queue_item_test.rb +++ b/test/models/publishing_queue_item_test.rb @@ -94,6 +94,20 @@ assert_equal [pqi1].sort, PublishingQueueItem.latest_failed.where(podcast: podcast) assert_equal [].sort, PublishingQueueItem.latest_attempted.latest_failed.where(podcast: podcast) end + + it "includes intermediate states like error_apple" do + pqi1 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item + PublishingPipelineState.error_apple!(podcast) + + assert_equal [pqi1].sort, PublishingQueueItem.latest_failed.where(podcast: podcast) + assert_equal [pqi1].sort, PublishingQueueItem.latest_attempted.latest_failed.where(podcast: podcast) + assert_equal [podcast], PublishingPipeLineState.latest_failed_podcasts + + _pqi2 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item + + assert_equal [pqi1].sort, PublishingQueueItem.latest_failed.where(podcast: podcast) + assert_equal [].sort, PublishingQueueItem.latest_attempted.latest_failed.where(podcast: podcast) + end end describe ".all_unfinished_items" do From d2e5f2a4b40bb592dfc547b05e889008660f3880 Mon Sep 17 00:00:00 2001 From: Sam Vevang Date: Wed, 30 Oct 2024 13:59:34 -0500 Subject: [PATCH 2/3] Fixup test --- app/models/publishing_pipeline_state.rb | 2 +- test/models/publishing_queue_item_test.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/app/models/publishing_pipeline_state.rb b/app/models/publishing_pipeline_state.rb index 7c8361485..2ff90ea5d 100644 --- a/app/models/publishing_pipeline_state.rb +++ b/app/models/publishing_pipeline_state.rb @@ -19,7 +19,7 @@ class PublishingPipelineState < ApplicationRecord scope :latest_failed_pipelines, -> { # Grab the latest attempted Publishing Item AND the latest failed Pub Item. # If that is a non-null intersection, then we have a current/latest+failed pipeline. - where(publishing_queue_item_id: PublishingQueueItem.latest_attempted.latest_failed.select(:id)) + where(publishing_queue_item_id: PublishingQueueItem.latest_attempted.latest_failed.order(id: :asc).select(:id)) } scope :latest_by_queue_item, -> { diff --git a/test/models/publishing_queue_item_test.rb b/test/models/publishing_queue_item_test.rb index 941890078..0d698a5f1 100644 --- a/test/models/publishing_queue_item_test.rb +++ b/test/models/publishing_queue_item_test.rb @@ -101,7 +101,7 @@ assert_equal [pqi1].sort, PublishingQueueItem.latest_failed.where(podcast: podcast) assert_equal [pqi1].sort, PublishingQueueItem.latest_attempted.latest_failed.where(podcast: podcast) - assert_equal [podcast], PublishingPipeLineState.latest_failed_podcasts + assert_equal [podcast], PublishingPipelineState.latest_failed_podcasts _pqi2 = PublishingPipelineState.create!(podcast: podcast, publishing_queue_item: PublishingQueueItem.create!(podcast: podcast)).publishing_queue_item From b4888561400166c6c78cc230ca658770ed7b2505 Mon Sep 17 00:00:00 2001 From: Sam Vevang Date: Mon, 18 Nov 2024 14:13:00 -0600 Subject: [PATCH 3/3] Remove ordering, and enforce in test --- app/models/publishing_pipeline_state.rb | 2 +- test/models/publishing_pipeline_state_test.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/app/models/publishing_pipeline_state.rb b/app/models/publishing_pipeline_state.rb index 2ff90ea5d..7c8361485 100644 --- a/app/models/publishing_pipeline_state.rb +++ b/app/models/publishing_pipeline_state.rb @@ -19,7 +19,7 @@ class PublishingPipelineState < ApplicationRecord scope :latest_failed_pipelines, -> { # Grab the latest attempted Publishing Item AND the latest failed Pub Item. # If that is a non-null intersection, then we have a current/latest+failed pipeline. - where(publishing_queue_item_id: PublishingQueueItem.latest_attempted.latest_failed.order(id: :asc).select(:id)) + where(publishing_queue_item_id: PublishingQueueItem.latest_attempted.latest_failed.select(:id)) } scope :latest_by_queue_item, -> { diff --git a/test/models/publishing_pipeline_state_test.rb b/test/models/publishing_pipeline_state_test.rb index 73c14d4e1..0acf778b5 100644 --- a/test/models/publishing_pipeline_state_test.rb +++ b/test/models/publishing_pipeline_state_test.rb @@ -196,7 +196,7 @@ # Verify that the intermediate error is included in the latest failed pipelines assert_equal [podcast], PublishingPipelineState.latest_failed_podcasts - assert_equal ["created", "error_apple", "complete"], PublishingPipelineState.latest_failed_pipelines.where(podcast: podcast).map(&:status) + assert_equal ["created", "error_apple", "complete"], PublishingPipelineState.latest_failed_pipelines.where(podcast: podcast).order(id: :asc).map(&:status) # Create another publishing queue item and associated pipeline state pqi2 = PublishingQueueItem.ensure_queued!(podcast)