diff --git a/app/jobs/publish_feed_job.rb b/app/jobs/publish_feed_job.rb index d25f9e271..50a3f63c7 100644 --- a/app/jobs/publish_feed_job.rb +++ b/app/jobs/publish_feed_job.rb @@ -32,6 +32,12 @@ def publish_apple(podcast, feed) res = PublishAppleJob.do_perform(podcast.apple_config) PublishingPipelineState.publish_apple!(podcast) res + rescue Apple::AssetStateTimeoutError => e + # Not strictly a 'fail state' because we want to retry this job + PublishingPipelineState.error_apple!(podcast) + Rails.logger.send(e.log_level, e.message, {podcast_id: podcast.id}) + NewRelic::Agent.notice_error(e) + raise e if podcast.apple_config.sync_blocks_rss rescue => e if podcast.apple_config.sync_blocks_rss fail_state(podcast, "apple", e) @@ -49,16 +55,32 @@ def publish_rss(podcast, feed) fail_state(podcast, "rss", e) end - def fail_state(podcast, type, error) - (pipeline_method, log_level) = case type - when "apple" then [:error_apple!, :warn] - when "rss" then [:error_rss!, :warn] - when "apple_timeout", "error" then [:error!, :error] + def apple_timeout_log_level(error) + error.try(:log_level) || :error + end + + def should_raise?(error) + if error.respond_to?(:raise_publishing_error?) + error.raise_publishing_error? + else + true end + end + + def fail_state(podcast, type, error) + (pipeline_method, log_level) = + case type + when "apple" then [:error_apple!, :warn] + when "rss" then [:error_rss!, :warn] + when "apple_timeout" + level = apple_timeout_log_level(error) + [:retry!, level] + when "error" then [:error!, :error] + end PublishingPipelineState.public_send(pipeline_method, podcast) Rails.logger.send(log_level, error.message, {podcast_id: podcast.id}) - raise error + raise error if should_raise?(error) end def save_file(podcast, feed, options = {}) diff --git a/app/models/apple/asset_state_timeout_error.rb b/app/models/apple/asset_state_timeout_error.rb index 82e0b19b7..e90b7c10f 100644 --- a/app/models/apple/asset_state_timeout_error.rb +++ b/app/models/apple/asset_state_timeout_error.rb @@ -12,5 +12,20 @@ def initialize(episodes) def episode_ids episodes.map(&:feeder_id) end + + def raise_publishing_error? + %i[error fatal].include?(log_level) + end + + def log_level + case attempts + when 0..4 + :warn + when 5 + :error + else + :fatal + end + end end end diff --git a/app/models/apple/episode_delivery_status.rb b/app/models/apple/episode_delivery_status.rb index 5781c180c..24bf5bcfd 100644 --- a/app/models/apple/episode_delivery_status.rb +++ b/app/models/apple/episode_delivery_status.rb @@ -2,6 +2,18 @@ module Apple class EpisodeDeliveryStatus < ApplicationRecord belongs_to :episode, -> { with_deleted }, class_name: "::Episode" + def self.measure_asset_processing_duration(apple_episode_delivery_statuses) + statuses = apple_episode_delivery_statuses.to_a + + last_status = statuses.shift + return nil unless last_status&.asset_processing_attempts.to_i.positive? + + start_status = statuses.find { |status| status.asset_processing_attempts.to_i.zero? } + return nil unless start_status + + Time.now - start_status.created_at + end + def self.update_status(episode, attrs) new_status = (episode.apple_episode_delivery_status&.dup || default_status(episode)) new_status.assign_attributes(attrs) diff --git a/app/models/concerns/apple_delivery.rb b/app/models/concerns/apple_delivery.rb index 0083481b7..3bf177e79 100644 --- a/app/models/concerns/apple_delivery.rb +++ b/app/models/concerns/apple_delivery.rb @@ -40,7 +40,7 @@ def apple_needs_delivery? end def apple_needs_delivery! - apple_update_delivery_status(delivered: false) + apple_update_delivery_status(delivered: false, asset_processing_attempts: 0) end def apple_has_delivery! @@ -48,15 +48,7 @@ def apple_has_delivery! end def measure_asset_processing_duration - statuses = apple_episode_delivery_statuses.to_a - - last_status = statuses.shift - return nil unless last_status&.asset_processing_attempts.to_i.positive? - - start_status = statuses.find { |status| status.asset_processing_attempts.to_i.zero? } - return nil unless start_status - - Time.now - start_status.created_at + Apple::EpisodeDeliveryStatus.measure_asset_processing_duration(apple_episode_delivery_statuses) end def apple_prepare_for_delivery! diff --git a/app/models/publishing_pipeline_state.rb b/app/models/publishing_pipeline_state.rb index 3fcb5baf7..5be1712bc 100644 --- a/app/models/publishing_pipeline_state.rb +++ b/app/models/publishing_pipeline_state.rb @@ -1,6 +1,6 @@ class PublishingPipelineState < ApplicationRecord - TERMINAL_STATUSES = [:complete, :error, :expired].freeze - TERMINAL_FAILURE_STATUSES = [:error, :expired].freeze + TERMINAL_STATUSES = [:complete, :error, :expired, :retry].freeze + TERMINAL_FAILURE_STATUSES = [:error, :expired, :retry].freeze UNIQUE_STATUSES = TERMINAL_STATUSES + [:created, :started] # Handle the max timout for a publishing pipeline: Pub RSS job + Pub Apple job + a few extra minutes of flight @@ -53,7 +53,8 @@ class PublishingPipelineState < ApplicationRecord :error, :expired, :error_apple, - :error_rss + :error_rss, + :retry ] validate :podcast_ids_match @@ -170,6 +171,10 @@ def self.expire!(podcast) state_transition(podcast, :expired) end + def self.retry!(podcast) + state_transition(podcast, :retry) + end + def self.expire_pipelines! Podcast.with_deleted.where(id: expired_pipelines.select(:podcast_id)).each do |podcast| Rails.logger.tagged("PublishingPipeLineState.expire_pipelines!", "Podcast:#{podcast.id}") do diff --git a/test/jobs/publish_feed_job_test.rb b/test/jobs/publish_feed_job_test.rb index f21445bbf..d3bead0c5 100644 --- a/test/jobs/publish_feed_job_test.rb +++ b/test/jobs/publish_feed_job_test.rb @@ -99,12 +99,16 @@ end describe "publishing to apple" do + let(:podcast) { create(:podcast) } + let(:public_feed) { podcast.default_feed } + let(:private_feed) { create(:apple_feed, podcast: podcast) } let(:apple_feed) { private_feed } - let(:apple_config) { podcast.apple_config } + let(:apple_config) { private_feed.apple_config } + let(:apple_publisher) { apple_config.build_publisher } before do assert private_feed.persisted? - assert podcast.reload.apple_config.present? + assert apple_config.persisted? end describe "#perform" do @@ -151,30 +155,55 @@ PublishingQueueItem.create!(podcast: feed.podcast) end - it "raises an error if the apple publishing fails" do + let(:episode1) { build(:uploaded_apple_episode, show: apple_publisher.show) } + let(:episode2) { build(:uploaded_apple_episode, show: apple_publisher.show) } + let(:episodes) { [episode1, episode2] } + + it "logs message if the apple publishing times out" do assert apple_feed.apple_config.present? assert apple_feed.apple_config.publish_enabled - PublishAppleJob.stub(:do_perform, ->(*, **) { raise "some apple error" }) do - assert_raises(RuntimeError) { PublishingPipelineState.attempt!(feed.podcast, perform_later: false) } - - assert_equal ["created", "started", "error", "error_apple"].sort, PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.pluck(:status).sort + expected_level_for_timeouts = [ + [0, 40], + [1, 40], + [2, 40], + [3, 40], + [4, 40], + [5, 50], + [6, 60] + ] + + expected_level_for_timeouts.each do |(attempts, level)| + # simulate a episode waiting n times + episodes.first.apple_episode_delivery_status.update(asset_processing_attempts: attempts) + + PublishFeedJob.stub(:s3_client, stub_client) do + PublishAppleJob.stub(:do_perform, ->(*, **) { raise Apple::AssetStateTimeoutError.new(episodes) }) do + lines = capture_json_logs do + PublishingQueueItem.ensure_queued!(feed.podcast) + PublishingPipelineState.attempt!(feed.podcast, perform_later: false) + rescue + nil + end + + log = lines.find { |l| l["msg"].include?("Timeout waiting for asset state change") } + assert log.present? + assert_equal level, log["level"] + + assert_equal ["created", "started", "error_apple", "retry"], PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.order(:id).pluck(:status) + end + end end end - it "does not raise an error if the apple publishing fails and apple sync does not block rss publishing" do + it "raises an error if the apple publishing times out" do assert apple_feed.apple_config.present? assert apple_feed.apple_config.publish_enabled - apple_feed.apple_config.update!(sync_blocks_rss: false) - feed.reload - PublishFeedJob.stub(:s3_client, stub_client) do - PublishAppleJob.stub(:do_perform, ->(*, **) { raise "some apple error" }) do - # no error raised - PublishingPipelineState.attempt!(feed.podcast, perform_later: false) + PublishAppleJob.stub(:do_perform, ->(*, **) { raise Apple::AssetStateTimeoutError.new([]) }) do + assert_raises(Apple::AssetStateTimeoutError) { PublishingPipelineState.attempt!(feed.podcast, perform_later: false) } - assert_equal ["created", "started", "error_apple", "published_rss", "published_rss", "published_rss", "complete"].sort, PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.pluck(:status).sort - end + assert_equal ["created", "started", "error_apple", "retry"], PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.order(id: :asc).pluck(:status) end end @@ -185,7 +214,23 @@ PublishAppleJob.stub(:do_perform, ->(*, **) { raise Apple::AssetStateTimeoutError.new([]) }) do assert_raises(Apple::AssetStateTimeoutError) { PublishingPipelineState.attempt!(feed.podcast, perform_later: false) } - assert_equal ["created", "started", "error", "error_apple"].sort, PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.pluck(:status).sort + assert_equal ["created", "started", "error_apple", "retry"], PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.order(id: :asc).pluck(:status) + end + end + + it "does not raise an error if the apple publishing fails and apple sync does not block rss publishing" do + assert apple_feed.apple_config.present? + assert apple_feed.apple_config.publish_enabled + apple_feed.apple_config.update!(sync_blocks_rss: false) + feed.reload + + PublishFeedJob.stub(:s3_client, stub_client) do + PublishAppleJob.stub(:do_perform, ->(*, **) { raise "some apple error" }) do + # no error raised + PublishingPipelineState.attempt!(feed.podcast, perform_later: false) + + assert_equal ["created", "started", "error_apple", "published_rss", "published_rss", "published_rss", "complete"].sort, PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.pluck(:status).sort + end end end end diff --git a/test/models/publishing_pipeline_state_test.rb b/test/models/publishing_pipeline_state_test.rb index 820524a6d..176a9b9f0 100644 --- a/test/models/publishing_pipeline_state_test.rb +++ b/test/models/publishing_pipeline_state_test.rb @@ -273,6 +273,41 @@ end end + describe "retry!" do + let(:podcast) { create(:podcast) } + let(:public_feed) { podcast.default_feed } + let(:private_feed) { create(:apple_feed, podcast: podcast) } + let(:apple_feed) { private_feed } + let(:apple_config) { private_feed.apple_config } + let(:apple_publisher) { apple_config.build_publisher } + + it 'sets the status to "retry"' do + episode = build(:uploaded_apple_episode, show: apple_publisher.show) + + # it does not trigger an exception + episode.apple_episode_delivery_status.update(asset_processing_attempts: 1) + + pqi = nil + PublishFeedJob.stub_any_instance(:save_file, nil) do + PublishAppleJob.stub(:do_perform, ->(*args) { raise Apple::AssetStateTimeoutError.new([episode]) }) do + pqi = PublishingQueueItem.ensure_queued!(podcast) + PublishingPipelineState.attempt!(podcast, perform_later: false) + end + end + + assert_equal ["created", "started", "error_apple", "retry"], PublishingPipelineState.where(podcast: podcast).order(:id).pluck(:status) + assert_equal "retry", pqi.reload.last_pipeline_state + + # it retries + PublishingPipelineState.retry_failed_pipelines! + assert_equal ["created", "started", "error_apple", "retry", "created"], PublishingPipelineState.where(podcast: podcast).order(:id).pluck(:status) + res_pqi = PublishingQueueItem.current_unfinished_item(podcast) + + assert res_pqi.id > pqi.id + assert_equal "created", res_pqi.last_pipeline_state + end + end + describe "complete!" do it 'sets the status to "complete"' do PublishFeedJob.stub_any_instance(:save_file, nil) do