Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/models/stream_recording.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class StreamRecording < ApplicationRecord
serialize :record_hours, coder: JSON

belongs_to :podcast, -> { with_deleted }, touch: true, optional: true
has_many :stream_resources, -> { order("start_at DESC") }, dependent: :destroy

scope :active, ->(now = Time.now) { status_enabled.where("end_date IS NULL OR end_date > ?", now) }
scope :recording, ->(now = Time.now) { active.where("start_date > ?", now) }
Expand All @@ -35,7 +36,6 @@ def self.config
end_date: s.end_date,
record_days: s.record_days,
record_hours: s.record_hours,
job_id: "#{s.podcast_id}/#{s.id}/:date/:hour",
callback: PorterUtils.callback_sqs
}
end
Expand Down
71 changes: 71 additions & 0 deletions app/models/stream_resource.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
class StreamResource < ApplicationRecord
BUFFER_SECONDS = 10

enum :status, %w[started created processing complete error retrying cancelled invalid].to_enum_h, prefix: true

belongs_to :stream_recording, -> { with_deleted }, touch: true, optional: true
has_one :podcast, through: :stream_recording
has_one :task, -> { order(id: :desc) }, as: :owner
has_many :tasks, as: :owner

validates :start_at, presence: true
validates :end_at, presence: true, comparison: {greater_than: :start_at}
validates :actual_start_at, presence: true
validates :actual_end_at, presence: true, comparison: {greater_than: :actual_start_at}
validates :original_url, presence: true

after_initialize :set_defaults
before_validation :set_defaults

acts_as_paranoid

# find/build for a specific oxbow job_id
# <podcast_id>/<stream_recording_id>/<start_time>/<end_time>/<guid>.mp3
def self.decode(str)
parts = str.split("/")
podcast_id = parts[0].to_i
recording_id = parts[1].to_i
start_at = safe_parse_time(parts[2])
end_at = safe_parse_time(parts[3])
return unless podcast_id > 0 && recording_id > 0 && start_at && end_at

rec = StreamRecording.find_by_id(recording_id)
return unless rec

res = rec.stream_resources.find_by(start_at: start_at, end_at: end_at)
return res if res

rec.stream_resources.build(start_at: start_at, end_at: end_at)
end

# NOTE: called twice, because podcast won't be there on initialize
def set_defaults
set_default(:status, "created")
set_default(:guid, SecureRandom.uuid)
set_default(:url, published_url)
end

def file_name
File.basename(URI.parse(original_url).path) if original_url.present?
end

def published_path
"#{podcast.path}/#{stream_resource_path}" if podcast
end

def published_url
"#{podcast.base_published_url}/#{stream_resource_path}" if podcast
end

private

def stream_resource_path
"streams/#{guid}/#{file_name}"
end

def safe_parse_time(str)
str&.to_time
rescue
nil
end
end
30 changes: 30 additions & 0 deletions db/migrate/20251218154550_create_stream_resources.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
class CreateStreamResources < ActiveRecord::Migration[7.2]
def change
create_table :stream_resources do |t|
t.references :stream_recording, index: true, foreign_key: true

# timeframe we were trying to capture, vs actually recorded
t.timestamp :start_at, index: true
t.timestamp :end_at, index: true
t.timestamp :actual_start_at
t.timestamp :actual_end_at

# file locations
t.string :guid
t.string :url
t.string :original_url

# metadata
t.string :status
t.string :mime_type
t.integer :file_size
t.integer :bit_rate
t.decimal :sample_rate
t.integer :channels
t.decimal :duration

t.timestamps
t.timestamp :deleted_at
end
end
end
27 changes: 26 additions & 1 deletion db/schema.rb

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 79 additions & 0 deletions test/factories/oxbow_callback_factory.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
FactoryBot.define do
factory :oxbow_job_id, class: String do
Id { "1234/5678/2025-12-17T15:00Z/2025-12-17T16:00Z/27a8112d-b582-4d23-8d73-257e543d64a4.mp3" }
initialize_with { attributes.with_indifferent_access }
end

factory :oxbow_job_received, class: Hash do
Time { "2025-12-17T14:51:16.490Z" }
Timestamp { 1765983076.49 }
JobReceived do
Job { build(:oxbow_job_id) }
Execution { {Id: "arn:aws:states:the-execution-arn"} }
State "RECEIVED"
end
initialize_with { attributes.with_indifferent_access }
end

factory :oxbow_ffmpeg_destination, class: Hash do
Mode { "AWS/S3" }
BucketName { "prx-feed-testing" }
ObjectKey { "1234/5678/2025-12-17T15:00Z/2025-12-17T16:00Z/27a8112d-b582-4d23-8d73-257e543d64a4.mp3" }
initialize_with { attributes.with_indifferent_access }
end

factory :oxbow_ffmpeg_task, class: Hash do
Type { "FFmpeg" }
FFmpeg do
Inputs { "-t 4350 -i \"http://some.stream/url/stream_name.mp3\"" }
GlobalOptions { "" }
InputFileOptions { "" }
OutputFileOptions { "" }
Outputs { [{Format: "mp3", Destination: build(:oxbow_ffmpeg_destination)}] }
end
initialize_with { attributes.with_indifferent_access }
end

factory :oxbow_ffmpeg_result, class: Hash do
Task { "FFmpeg" }
Time { "2025-12-17T16:02:23.826Z" }
Timestamp { 1765987343.826 }
FFmpeg do
Outputs do
[
Mode: "AWS/S3",
BucketName: "prx-feed-testing",
ObjectKey: "1234/5678/2025-12-17T15:00Z/2025-12-17T16:00Z/27a8112d-b582-4d23-8d73-257e543d64a4.mp3",
Duration: 4350000,
Size: 12345678
]
end
end
initialize_with { attributes.with_indifferent_access }
end

factory :oxbow_task_result, class: Hash do
Time { "2025-12-17T16:02:23.210Z" }
Timestamp { 1765987343.21 }
Task { build(:oxbow_ffmpeg_task) }
TaskResult do
Job { build(:oxbow_job_id) }
Execution { {Id: "arn:aws:states:the-execution-arn"} }
Result build(:oxbow_ffmpeg_result)
end
initialize_with { attributes.with_indifferent_access }
end

factory :oxbow_job_results, class: Hash do
Time { "2025-12-17T16:02:23.826Z" }
Timestamp { 1765987343.826 }
JobResult do
Job { build(:oxbow_job_id) }
Execution { {Id: "arn:aws:states:the-execution-arn"} }
State { "DONE" }
FailedTasks { [] }
TaskResults { [build(:oxbow_ffmpeg_result)] }
end
initialize_with { attributes.with_indifferent_access }
end
end
18 changes: 18 additions & 0 deletions test/factories/stream_resource_factory.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FactoryBot.define do
factory :stream_resource do
start_at { "2025-12-18T13:00:00Z" }
end_at { "2025-12-18T14:00:00Z" }
actual_start_at { "2025-12-18T12:59:50Z" }
actual_end_at { "2025-12-18T14:00:10Z" }

original_url { "s3://prx-testing/test/audio.mp3" }

status { "complete" }
mime_type { "audio/mpeg" }
file_size { 774059 }
bit_rate { 128 }
sample_rate { 44100 }
channels { 2 }
duration { 3620.0 }
end
end
92 changes: 92 additions & 0 deletions test/models/stream_resource_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
require "test_helper"

describe StreamResource do
let(:podcast) { build_stubbed(:podcast) }
let(:resource) { build_stubbed(:stream_resource, podcast: podcast) }

describe ".decode" do
let(:rec) { create(:stream_recording) }
let(:rec2) { create(:stream_recording) }

it "finds or builds stream resources" do
id = "1234/#{rec.id}/2025-12-17T15:00Z/2025-12-17T16:00Z/some-guid.mp3"
res = StreamResource.decode(id)

assert res.new_record?
assert_equal rec.id, res.stream_recording_id
assert_equal "2025-12-17T15:00Z".to_time, res.start_at
assert_equal "2025-12-17T16:00Z".to_time, res.end_at

# skip validations - we're just testing finding my stream+dates
res.save(validate: false)
assert res.persisted?

# finding again returns the same resource
assert_equal res, StreamResource.decode(id)
assert_equal res, StreamResource.decode(id.sub("some-guid.mp3", "other-guid.mp3"))

# but different recordings or dates are not new resources
id2 = id.sub("/#{rec.id}/", "/#{rec2.id}/")
assert StreamResource.decode(id2).new_record?
assert StreamResource.decode(id.sub("15:00Z", "15:30Z")).new_record?
assert StreamResource.decode(id.sub("16:00Z", "16:05Z")).new_record?

# bad data is nil
assert_nil StreamResource.decode(id.sub("1234/", "abcd/"))
assert_nil StreamResource.decode(id.sub("/#{rec.id}/", "/abcd/"))
assert_nil StreamResource.decode(id.sub("17T15:00Z", ""))
assert_nil StreamResource.decode(id.sub("17T16:00Z", ""))
assert_nil StreamResource.decode("1234/5678/something")
assert_nil StreamResource.decode("whatev")
end
end

describe "#set_defaults" do
it "sets unchanged defaults" do
res = StreamResource.new(podcast: podcast)
assert_equal "created", res.status
assert res.guid.present?
assert res.url.present?
refute res.changed?
end
end

describe "#file_name" do
it "parses the original url" do
assert_equal "audio.mp3", resource.file_name

resource.original_url = "http://some.where/the/file.name.here#and?other&stuff=1"
assert_equal "file.name.here", resource.file_name

resource.original_url = ""
assert_nil resource.file_name
end
end

describe "#published_path" do
it "includes the podcast prefix" do
assert_equal "#{podcast.id}/streams/#{resource.guid}/audio.mp3", resource.published_path
end
end

describe "#published_url" do
it "includes the podcast http url" do
assert_equal "https://f.prxu.org/#{resource.published_path}", resource.published_url
end

it "sets the url field" do
# NOTE: podcast is nil after_initialize
assert_nil resource.url

# but will be there before validation
assert resource.valid?
refute_nil resource.url
assert_equal resource.published_url, resource.url

resource.guid = "some-other-guid"
resource.original_url = "http://some/other.filename"
assert resource.valid?
refute_equal resource.published_url, resource.url
end
end
end