Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quiet job continuation #1133

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
25 changes: 19 additions & 6 deletions app/jobs/publish_feed_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ def publish_apple(podcast, feed)
res = PublishAppleJob.do_perform(podcast.apple_config)
PublishingPipelineState.publish_apple!(podcast)
res
rescue Apple::AssetStateTimeoutError => e
# Not strictly a 'fail state' because we want to retry this job
PublishingPipelineState.error_apple!(podcast)
svevang marked this conversation as resolved.
Show resolved Hide resolved
Rails.logger.send(e.log_level, e.message, {podcast_id: podcast.id})
svevang marked this conversation as resolved.
Show resolved Hide resolved
raise e if podcast.apple_config.sync_blocks_rss
rescue => e
if podcast.apple_config.sync_blocks_rss
fail_state(podcast, "apple", e)
Expand All @@ -49,16 +54,24 @@ def publish_rss(podcast, feed)
fail_state(podcast, "rss", e)
end

def apple_timeout_log_level(error)
error.try(:log_level) || :error
end

def fail_state(podcast, type, error)
(pipeline_method, log_level) = case type
when "apple" then [:error_apple!, :warn]
when "rss" then [:error_rss!, :warn]
when "apple_timeout", "error" then [:error!, :error]
end
(pipeline_method, log_level, raise_exception) =
case type
when "apple" then [:error_apple!, :warn, true]
when "rss" then [:error_rss!, :warn, true]
when "apple_timeout"
level = apple_timeout_log_level(error)
[:retry!, level, level == :fatal || level == :error]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might refactor to %i[error fatal].include?(level), but this makes sense as the place to decide to raise the error or not.
Another question that's not a request for change - does it feel like this is info I wish was on the error somehow, like how the error has a log level, can it also have a raise or not? maybe that is not a good responsibility for the error, idk.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: clarity, Good point!

Re: error interface responsibility. Hrm, yes, I see what you're getting at. In essence, we're already responding to the error as the controlling parameter here, so why not push that should_raise? responsibility up to the error class itself...

when "error" then [:error!, :error, true]
end

PublishingPipelineState.public_send(pipeline_method, podcast)
Rails.logger.send(log_level, error.message, {podcast_id: podcast.id})
raise error
raise error if raise_exception
end

def save_file(podcast, feed, options = {})
Expand Down
11 changes: 11 additions & 0 deletions app/models/apple/asset_state_timeout_error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,16 @@ def initialize(episodes)
def episode_ids
episodes.map(&:feeder_id)
end

def log_level
case attempts
when 0..4
:warn
when 5
:error
else
:fatal
end
end
end
end
57 changes: 57 additions & 0 deletions app/models/apple/episode_delivery_status.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,63 @@ module Apple
class EpisodeDeliveryStatus < ApplicationRecord
belongs_to :episode, -> { with_deleted }, class_name: "::Episode"

def self.change_log(apple_episode_delivery_statuses)
svevang marked this conversation as resolved.
Show resolved Hide resolved
return [] unless apple_episode_delivery_statuses&.any?

statuses = apple_episode_delivery_statuses.to_a
changes = []

tracked_attributes = column_names - ["id", "created_at"]

latest_values = {}

statuses.reverse_each do |status|
tracked_attributes.each do |attr|
value = status.send(attr)

# Only record the change if we haven't seen this attribute before
# or if the value is different from the most recent one
if !latest_values.key?(attr) || latest_values[attr] != value
latest_values[attr] = value

# Format the change message
message = format_change_message(attr, value, status.created_at)
changes.unshift(message) unless message.nil?
end
end
end

changes
end

def self.format_change_message(attribute, value, timestamp)
return nil if value.nil?

formatted_time = timestamp.strftime("%Y-%m-%d %H:%M:%S")
"#{formatted_time}: #{attribute.humanize} changed to #{value}"
end

def self.measure_asset_processing_duration(apple_episode_delivery_statuses, relative_timestamp)
return [] unless apple_episode_delivery_statuses&.any?

statuses = apple_episode_delivery_statuses.to_a

last_status = statuses.shift

return [nil, measure_asset_processing_duration(statuses, last_status.created_at)].flatten unless last_status&.asset_processing_attempts.to_i.positive?

end_status = while (status = statuses.shift)
break status if status.asset_processing_attempts.to_i.zero?
end

return [nil].flatten unless end_status

[
relative_timestamp - end_status.created_at,
measure_asset_processing_duration(statuses, end_status.created_at)
].flatten
end

def self.update_status(episode, attrs)
new_status = (episode.apple_episode_delivery_status&.dup || default_status(episode))
new_status.assign_attributes(attrs)
Expand Down
12 changes: 2 additions & 10 deletions app/models/concerns/apple_delivery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,15 @@ def apple_needs_delivery?
end

def apple_needs_delivery!
apple_update_delivery_status(delivered: false)
apple_update_delivery_status(delivered: false, asset_processing_attempts: 0)
end

def apple_has_delivery!
apple_update_delivery_status(delivered: true)
end

def measure_asset_processing_duration
statuses = apple_episode_delivery_statuses.to_a

last_status = statuses.shift
return nil unless last_status&.asset_processing_attempts.to_i.positive?

start_status = statuses.find { |status| status.asset_processing_attempts.to_i.zero? }
return nil unless start_status

Time.now - start_status.created_at
Apple::EpisodeDeliveryStatus.measure_asset_processing_duration(apple_episode_delivery_statuses, Time.now.utc).first
end

def apple_prepare_for_delivery!
Expand Down
11 changes: 8 additions & 3 deletions app/models/publishing_pipeline_state.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
class PublishingPipelineState < ApplicationRecord
TERMINAL_STATUSES = [:complete, :error, :expired].freeze
TERMINAL_FAILURE_STATUSES = [:error, :expired].freeze
TERMINAL_STATUSES = [:complete, :error, :expired, :retry].freeze
TERMINAL_FAILURE_STATUSES = [:error, :expired, :retry].freeze
UNIQUE_STATUSES = TERMINAL_STATUSES + [:created, :started]

# Handle the max timout for a publishing pipeline: Pub RSS job + Pub Apple job + a few extra minutes of flight
Expand Down Expand Up @@ -53,7 +53,8 @@ class PublishingPipelineState < ApplicationRecord
:error,
:expired,
:error_apple,
:error_rss
:error_rss,
:retry
]

validate :podcast_ids_match
Expand Down Expand Up @@ -170,6 +171,10 @@ def self.expire!(podcast)
state_transition(podcast, :expired)
end

def self.retry!(podcast)
state_transition(podcast, :retry)
end

def self.expire_pipelines!
Podcast.with_deleted.where(id: expired_pipelines.select(:podcast_id)).each do |podcast|
Rails.logger.tagged("PublishingPipeLineState.expire_pipelines!", "Podcast:#{podcast.id}") do
Expand Down
79 changes: 62 additions & 17 deletions test/jobs/publish_feed_job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,16 @@
end

describe "publishing to apple" do
let(:podcast) { create(:podcast) }
let(:public_feed) { podcast.default_feed }
let(:private_feed) { create(:apple_feed, podcast: podcast) }
let(:apple_feed) { private_feed }
let(:apple_config) { podcast.apple_config }
let(:apple_config) { private_feed.apple_config }
let(:apple_publisher) { apple_config.build_publisher }

before do
assert private_feed.persisted?
assert podcast.reload.apple_config.present?
assert apple_config.persisted?
end

describe "#perform" do
Expand Down Expand Up @@ -151,30 +155,55 @@
PublishingQueueItem.create!(podcast: feed.podcast)
end

it "raises an error if the apple publishing fails" do
let(:episode1) { build(:uploaded_apple_episode, show: apple_publisher.show) }
let(:episode2) { build(:uploaded_apple_episode, show: apple_publisher.show) }
let(:episodes) { [episode1, episode2] }

it "logs message if the apple publishing times out" do
assert apple_feed.apple_config.present?
assert apple_feed.apple_config.publish_enabled

PublishAppleJob.stub(:do_perform, ->(*, **) { raise "some apple error" }) do
assert_raises(RuntimeError) { PublishingPipelineState.attempt!(feed.podcast, perform_later: false) }

assert_equal ["created", "started", "error", "error_apple"].sort, PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.pluck(:status).sort
expected_level_for_timeouts = [
[0, 40],
[1, 40],
[2, 40],
[3, 40],
[4, 40],
[5, 50],
[6, 60]
]

expected_level_for_timeouts.each do |(attempts, level)|
# simulate a episode waiting n times
episodes.first.apple_episode_delivery_status.update(asset_processing_attempts: attempts)

PublishFeedJob.stub(:s3_client, stub_client) do
PublishAppleJob.stub(:do_perform, ->(*, **) { raise Apple::AssetStateTimeoutError.new(episodes) }) do
lines = capture_json_logs do
PublishingQueueItem.ensure_queued!(feed.podcast)
PublishingPipelineState.attempt!(feed.podcast, perform_later: false)
rescue
nil
end

log = lines.find { |l| l["msg"].include?("Timeout waiting for asset state change") }
assert log.present?
assert_equal level, log["level"]

assert_equal ["created", "started", "error_apple", "retry"], PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.order(:id).pluck(:status)
end
end
end
end

it "does not raise an error if the apple publishing fails and apple sync does not block rss publishing" do
it "raises an error if the apple publishing times out" do
assert apple_feed.apple_config.present?
assert apple_feed.apple_config.publish_enabled
apple_feed.apple_config.update!(sync_blocks_rss: false)
feed.reload

PublishFeedJob.stub(:s3_client, stub_client) do
PublishAppleJob.stub(:do_perform, ->(*, **) { raise "some apple error" }) do
# no error raised
PublishingPipelineState.attempt!(feed.podcast, perform_later: false)
PublishAppleJob.stub(:do_perform, ->(*, **) { raise Apple::AssetStateTimeoutError.new([]) }) do
assert_raises(Apple::AssetStateTimeoutError) { PublishingPipelineState.attempt!(feed.podcast, perform_later: false) }

assert_equal ["created", "started", "error_apple", "published_rss", "published_rss", "published_rss", "complete"].sort, PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.pluck(:status).sort
end
assert_equal ["created", "started", "error_apple", "retry"], PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.order(id: :asc).pluck(:status)
end
end

Expand All @@ -185,7 +214,23 @@
PublishAppleJob.stub(:do_perform, ->(*, **) { raise Apple::AssetStateTimeoutError.new([]) }) do
assert_raises(Apple::AssetStateTimeoutError) { PublishingPipelineState.attempt!(feed.podcast, perform_later: false) }

assert_equal ["created", "started", "error", "error_apple"].sort, PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.pluck(:status).sort
assert_equal ["created", "started", "error_apple", "retry"], PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.order(id: :asc).pluck(:status)
end
end

it "does not raise an error if the apple publishing fails and apple sync does not block rss publishing" do
assert apple_feed.apple_config.present?
assert apple_feed.apple_config.publish_enabled
apple_feed.apple_config.update!(sync_blocks_rss: false)
feed.reload

PublishFeedJob.stub(:s3_client, stub_client) do
PublishAppleJob.stub(:do_perform, ->(*, **) { raise "some apple error" }) do
# no error raised
PublishingPipelineState.attempt!(feed.podcast, perform_later: false)

assert_equal ["created", "started", "error_apple", "published_rss", "published_rss", "published_rss", "complete"].sort, PublishingPipelineState.where(podcast: feed.podcast).latest_pipelines.pluck(:status).sort
end
end
end
end
Expand Down
35 changes: 35 additions & 0 deletions test/models/publishing_pipeline_state_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,41 @@
end
end

describe "retry!" do
let(:podcast) { create(:podcast) }
let(:public_feed) { podcast.default_feed }
let(:private_feed) { create(:apple_feed, podcast: podcast) }
let(:apple_feed) { private_feed }
let(:apple_config) { private_feed.apple_config }
let(:apple_publisher) { apple_config.build_publisher }

it 'sets the status to "retry"' do
episode = build(:uploaded_apple_episode, show: apple_publisher.show)

# it does not trigger an exception
episode.apple_episode_delivery_status.update(asset_processing_attempts: 1)

pqi = nil
PublishFeedJob.stub_any_instance(:save_file, nil) do
PublishAppleJob.stub(:do_perform, ->(*args) { raise Apple::AssetStateTimeoutError.new([episode]) }) do
pqi = PublishingQueueItem.ensure_queued!(podcast)
PublishingPipelineState.attempt!(podcast, perform_later: false)
end
end

assert_equal ["created", "started", "error_apple", "retry"], PublishingPipelineState.where(podcast: podcast).order(:id).pluck(:status)
assert_equal "retry", pqi.reload.last_pipeline_state

# it retries
PublishingPipelineState.retry_failed_pipelines!
assert_equal ["created", "started", "error_apple", "retry", "created"], PublishingPipelineState.where(podcast: podcast).order(:id).pluck(:status)
res_pqi = PublishingQueueItem.current_unfinished_item(podcast)

assert res_pqi.id > pqi.id
assert_equal "created", res_pqi.last_pipeline_state
end
end

describe "complete!" do
it 'sets the status to "complete"' do
PublishFeedJob.stub_any_instance(:save_file, nil) do
Expand Down
Loading