diff --git a/app/models/stream_recording.rb b/app/models/stream_recording.rb index ad825158a..4d8a5357d 100644 --- a/app/models/stream_recording.rb +++ b/app/models/stream_recording.rb @@ -9,6 +9,7 @@ class StreamRecording < ApplicationRecord serialize :record_hours, coder: JSON belongs_to :podcast, -> { with_deleted }, touch: true, optional: true + has_many :stream_resources, -> { order("start_at DESC") }, dependent: :destroy scope :active, ->(now = Time.now) { status_enabled.where("end_date IS NULL OR end_date > ?", now) } scope :recording, ->(now = Time.now) { active.where("start_date > ?", now) } @@ -35,7 +36,6 @@ def self.config end_date: s.end_date, record_days: s.record_days, record_hours: s.record_hours, - job_id: "#{s.podcast_id}/#{s.id}/:date/:hour", callback: PorterUtils.callback_sqs } end diff --git a/app/models/stream_resource.rb b/app/models/stream_resource.rb new file mode 100644 index 000000000..dcb65568d --- /dev/null +++ b/app/models/stream_resource.rb @@ -0,0 +1,71 @@ +class StreamResource < ApplicationRecord + BUFFER_SECONDS = 10 + + enum :status, %w[started created processing complete error retrying cancelled invalid].to_enum_h, prefix: true + + belongs_to :stream_recording, -> { with_deleted }, touch: true, optional: true + has_one :podcast, through: :stream_recording + has_one :task, -> { order(id: :desc) }, as: :owner + has_many :tasks, as: :owner + + validates :start_at, presence: true + validates :end_at, presence: true, comparison: {greater_than: :start_at} + validates :actual_start_at, presence: true + validates :actual_end_at, presence: true, comparison: {greater_than: :actual_start_at} + validates :original_url, presence: true + + after_initialize :set_defaults + before_validation :set_defaults + + acts_as_paranoid + + # find/build for a specific oxbow job_id + # ////.mp3 + def self.decode(str) + parts = str.split("/") + podcast_id = parts[0].to_i + recording_id = parts[1].to_i + start_at = safe_parse_time(parts[2]) + end_at = safe_parse_time(parts[3]) + return unless podcast_id > 0 && recording_id > 0 && start_at && end_at + + rec = StreamRecording.find_by_id(recording_id) + return unless rec + + res = rec.stream_resources.find_by(start_at: start_at, end_at: end_at) + return res if res + + rec.stream_resources.build(start_at: start_at, end_at: end_at) + end + + # NOTE: called twice, because podcast won't be there on initialize + def set_defaults + set_default(:status, "created") + set_default(:guid, SecureRandom.uuid) + set_default(:url, published_url) + end + + def file_name + File.basename(URI.parse(original_url).path) if original_url.present? + end + + def published_path + "#{podcast.path}/#{stream_resource_path}" if podcast + end + + def published_url + "#{podcast.base_published_url}/#{stream_resource_path}" if podcast + end + + private + + def stream_resource_path + "streams/#{guid}/#{file_name}" + end + + def safe_parse_time(str) + str&.to_time + rescue + nil + end +end diff --git a/db/migrate/20251218154550_create_stream_resources.rb b/db/migrate/20251218154550_create_stream_resources.rb new file mode 100644 index 000000000..7f3c9c1c9 --- /dev/null +++ b/db/migrate/20251218154550_create_stream_resources.rb @@ -0,0 +1,30 @@ +class CreateStreamResources < ActiveRecord::Migration[7.2] + def change + create_table :stream_resources do |t| + t.references :stream_recording, index: true, foreign_key: true + + # timeframe we were trying to capture, vs actually recorded + t.timestamp :start_at, index: true + t.timestamp :end_at, index: true + t.timestamp :actual_start_at + t.timestamp :actual_end_at + + # file locations + t.string :guid + t.string :url + t.string :original_url + + # metadata + t.string :status + t.string :mime_type + t.integer :file_size + t.integer :bit_rate + t.decimal :sample_rate + t.integer :channels + t.decimal :duration + + t.timestamps + t.timestamp :deleted_at + end + end +end diff --git a/db/schema.rb b/db/schema.rb index afc9565ec..dc6cedf44 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.2].define(version: 2025_11_19_222707) do +ActiveRecord::Schema[7.2].define(version: 2025_12_18_154550) do # These are extensions that must be enabled in order to support this database enable_extension "plpgsql" enable_extension "uuid-ossp" @@ -476,6 +476,30 @@ t.index ["podcast_id"], name: "index_stream_recordings_on_podcast_id" end + create_table "stream_resources", force: :cascade do |t| + t.bigint "stream_recording_id" + t.datetime "start_at", precision: nil + t.datetime "end_at", precision: nil + t.datetime "actual_start_at", precision: nil + t.datetime "actual_end_at", precision: nil + t.string "guid" + t.string "url" + t.string "original_url" + t.string "status" + t.string "mime_type" + t.integer "file_size" + t.integer "bit_rate" + t.decimal "sample_rate" + t.integer "channels" + t.decimal "duration" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.datetime "deleted_at", precision: nil + t.index ["end_at"], name: "index_stream_resources_on_end_at" + t.index ["start_at"], name: "index_stream_resources_on_start_at" + t.index ["stream_recording_id"], name: "index_stream_resources_on_stream_recording_id" + end + create_table "subscribe_links", force: :cascade do |t| t.datetime "created_at", null: false t.datetime "updated_at", null: false @@ -543,4 +567,5 @@ add_foreign_key "publishing_pipeline_states", "publishing_queue_items" add_foreign_key "publishing_queue_items", "podcasts" add_foreign_key "stream_recordings", "podcasts" + add_foreign_key "stream_resources", "stream_recordings" end diff --git a/test/factories/oxbow_callback_factory.rb b/test/factories/oxbow_callback_factory.rb new file mode 100644 index 000000000..0c51f0b54 --- /dev/null +++ b/test/factories/oxbow_callback_factory.rb @@ -0,0 +1,79 @@ +FactoryBot.define do + factory :oxbow_job_id, class: String do + Id { "1234/5678/2025-12-17T15:00Z/2025-12-17T16:00Z/27a8112d-b582-4d23-8d73-257e543d64a4.mp3" } + initialize_with { attributes.with_indifferent_access } + end + + factory :oxbow_job_received, class: Hash do + Time { "2025-12-17T14:51:16.490Z" } + Timestamp { 1765983076.49 } + JobReceived do + Job { build(:oxbow_job_id) } + Execution { {Id: "arn:aws:states:the-execution-arn"} } + State "RECEIVED" + end + initialize_with { attributes.with_indifferent_access } + end + + factory :oxbow_ffmpeg_destination, class: Hash do + Mode { "AWS/S3" } + BucketName { "prx-feed-testing" } + ObjectKey { "1234/5678/2025-12-17T15:00Z/2025-12-17T16:00Z/27a8112d-b582-4d23-8d73-257e543d64a4.mp3" } + initialize_with { attributes.with_indifferent_access } + end + + factory :oxbow_ffmpeg_task, class: Hash do + Type { "FFmpeg" } + FFmpeg do + Inputs { "-t 4350 -i \"http://some.stream/url/stream_name.mp3\"" } + GlobalOptions { "" } + InputFileOptions { "" } + OutputFileOptions { "" } + Outputs { [{Format: "mp3", Destination: build(:oxbow_ffmpeg_destination)}] } + end + initialize_with { attributes.with_indifferent_access } + end + + factory :oxbow_ffmpeg_result, class: Hash do + Task { "FFmpeg" } + Time { "2025-12-17T16:02:23.826Z" } + Timestamp { 1765987343.826 } + FFmpeg do + Outputs do + [ + Mode: "AWS/S3", + BucketName: "prx-feed-testing", + ObjectKey: "1234/5678/2025-12-17T15:00Z/2025-12-17T16:00Z/27a8112d-b582-4d23-8d73-257e543d64a4.mp3", + Duration: 4350000, + Size: 12345678 + ] + end + end + initialize_with { attributes.with_indifferent_access } + end + + factory :oxbow_task_result, class: Hash do + Time { "2025-12-17T16:02:23.210Z" } + Timestamp { 1765987343.21 } + Task { build(:oxbow_ffmpeg_task) } + TaskResult do + Job { build(:oxbow_job_id) } + Execution { {Id: "arn:aws:states:the-execution-arn"} } + Result build(:oxbow_ffmpeg_result) + end + initialize_with { attributes.with_indifferent_access } + end + + factory :oxbow_job_results, class: Hash do + Time { "2025-12-17T16:02:23.826Z" } + Timestamp { 1765987343.826 } + JobResult do + Job { build(:oxbow_job_id) } + Execution { {Id: "arn:aws:states:the-execution-arn"} } + State { "DONE" } + FailedTasks { [] } + TaskResults { [build(:oxbow_ffmpeg_result)] } + end + initialize_with { attributes.with_indifferent_access } + end +end diff --git a/test/factories/stream_resource_factory.rb b/test/factories/stream_resource_factory.rb new file mode 100644 index 000000000..64afb0759 --- /dev/null +++ b/test/factories/stream_resource_factory.rb @@ -0,0 +1,18 @@ +FactoryBot.define do + factory :stream_resource do + start_at { "2025-12-18T13:00:00Z" } + end_at { "2025-12-18T14:00:00Z" } + actual_start_at { "2025-12-18T12:59:50Z" } + actual_end_at { "2025-12-18T14:00:10Z" } + + original_url { "s3://prx-testing/test/audio.mp3" } + + status { "complete" } + mime_type { "audio/mpeg" } + file_size { 774059 } + bit_rate { 128 } + sample_rate { 44100 } + channels { 2 } + duration { 3620.0 } + end +end diff --git a/test/models/stream_resource_test.rb b/test/models/stream_resource_test.rb new file mode 100644 index 000000000..19bdb14a4 --- /dev/null +++ b/test/models/stream_resource_test.rb @@ -0,0 +1,92 @@ +require "test_helper" + +describe StreamResource do + let(:podcast) { build_stubbed(:podcast) } + let(:resource) { build_stubbed(:stream_resource, podcast: podcast) } + + describe ".decode" do + let(:rec) { create(:stream_recording) } + let(:rec2) { create(:stream_recording) } + + it "finds or builds stream resources" do + id = "1234/#{rec.id}/2025-12-17T15:00Z/2025-12-17T16:00Z/some-guid.mp3" + res = StreamResource.decode(id) + + assert res.new_record? + assert_equal rec.id, res.stream_recording_id + assert_equal "2025-12-17T15:00Z".to_time, res.start_at + assert_equal "2025-12-17T16:00Z".to_time, res.end_at + + # skip validations - we're just testing finding my stream+dates + res.save(validate: false) + assert res.persisted? + + # finding again returns the same resource + assert_equal res, StreamResource.decode(id) + assert_equal res, StreamResource.decode(id.sub("some-guid.mp3", "other-guid.mp3")) + + # but different recordings or dates are not new resources + id2 = id.sub("/#{rec.id}/", "/#{rec2.id}/") + assert StreamResource.decode(id2).new_record? + assert StreamResource.decode(id.sub("15:00Z", "15:30Z")).new_record? + assert StreamResource.decode(id.sub("16:00Z", "16:05Z")).new_record? + + # bad data is nil + assert_nil StreamResource.decode(id.sub("1234/", "abcd/")) + assert_nil StreamResource.decode(id.sub("/#{rec.id}/", "/abcd/")) + assert_nil StreamResource.decode(id.sub("17T15:00Z", "")) + assert_nil StreamResource.decode(id.sub("17T16:00Z", "")) + assert_nil StreamResource.decode("1234/5678/something") + assert_nil StreamResource.decode("whatev") + end + end + + describe "#set_defaults" do + it "sets unchanged defaults" do + res = StreamResource.new(podcast: podcast) + assert_equal "created", res.status + assert res.guid.present? + assert res.url.present? + refute res.changed? + end + end + + describe "#file_name" do + it "parses the original url" do + assert_equal "audio.mp3", resource.file_name + + resource.original_url = "http://some.where/the/file.name.here#and?other&stuff=1" + assert_equal "file.name.here", resource.file_name + + resource.original_url = "" + assert_nil resource.file_name + end + end + + describe "#published_path" do + it "includes the podcast prefix" do + assert_equal "#{podcast.id}/streams/#{resource.guid}/audio.mp3", resource.published_path + end + end + + describe "#published_url" do + it "includes the podcast http url" do + assert_equal "https://f.prxu.org/#{resource.published_path}", resource.published_url + end + + it "sets the url field" do + # NOTE: podcast is nil after_initialize + assert_nil resource.url + + # but will be there before validation + assert resource.valid? + refute_nil resource.url + assert_equal resource.published_url, resource.url + + resource.guid = "some-other-guid" + resource.original_url = "http://some/other.filename" + assert resource.valid? + refute_equal resource.published_url, resource.url + end + end +end