Skip to content

Commit

Permalink
fix: handle generation progress
Browse files Browse the repository at this point in the history
  • Loading branch information
listlessbird committed Nov 7, 2024
1 parent 3cca99b commit 70dcbc4
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 123 deletions.
Binary file modified web/bun.lockb
Binary file not shown.
2 changes: 2 additions & 0 deletions web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"class-variance-authority": "^0.7.0",
"clsx": "^2.1.1",
"drizzle-orm": "^0.36.0",
"eventsource": "^2.0.2",
"framer-motion": "^11.11.7",
"lucide-react": "^0.451.0",
"next": "15.0.0-rc.0",
Expand All @@ -52,6 +53,7 @@
"zod": "^3.23.8"
},
"devDependencies": {
"@types/eventsource": "^1.1.15",
"@types/node": "^20",
"@types/react": "^18",
"@types/react-dom": "^18",
Expand Down
144 changes: 69 additions & 75 deletions web/src/app/(history)/history/(item)/[id]/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,17 @@ import { uploadVideoToR2 } from "@/lib/r2";
import { GeneratedAssetType } from "@/types";
import { revalidatePath } from "next/cache";

const TIMEOUT_MS = 5 * 60 * 1000;
const TIMEOUT_MS = 10 * 60 * 1000;
const RETRY_ATTEMPTS = 3;
const RETRY_DELAY_MS = 1000;
const RETRY_DELAY_MS = 2000;
const CONNECTION_TIMEOUT_MS = 30000;

class GenerationError extends Error {
constructor(message: string, public readonly details?: any) {
super(message);
this.name = "GenerationError";
}
}

async function fetchWithRetry(
url: string,
Expand All @@ -17,19 +25,37 @@ async function fetchWithRetry(
console.log(
`[fetchWithRetry] Attempt ${RETRY_ATTEMPTS - attempts + 1} for URL: ${url}`
);

try {
const response = await fetch(url, options);
const controller = new AbortController();
const timeoutId = setTimeout(
() => controller.abort(),
CONNECTION_TIMEOUT_MS
);

const response = await fetch(url, {
...options,
signal: controller.signal,
});

clearTimeout(timeoutId);

if (!response.ok) {
console.log(
`[fetchWithRetry] Failed attempt with status: ${response.status}`
);
throw new Error(`HTTP error! status: ${response.status}`);
}
console.log(`[fetchWithRetry] Successful response received`);

return response;
} catch (error) {
console.log(`[fetchWithRetry] Error during fetch:`, error);
if (attempts <= 1) throw error;

if (error instanceof Error && error.name === "AbortError") {
throw new GenerationError("Connection timeout exceeded");
}

if (attempts <= 1) {
throw new GenerationError("Failed to connect to renderer service", error);
}

console.log(`[fetchWithRetry] Retrying in ${RETRY_DELAY_MS}ms`);
await new Promise((resolve) => setTimeout(resolve, RETRY_DELAY_MS));
return fetchWithRetry(url, options, attempts - 1);
Expand All @@ -43,11 +69,7 @@ export async function startGeneration({
}) {
console.log(`[startGeneration] Starting generation for asset:`, asset);
const { user } = await validateRequest();
if (!user) {
console.log(`[startGeneration] Authorization failed - no user found`);
throw new Error("Unauthorized");
}
console.log(`[startGeneration] User authorized:`, user.googleId);
if (!user) throw new Error("Unauthorized");

const encoder = new TextEncoder();
let abortController: AbortController | null = null;
Expand All @@ -63,6 +85,7 @@ export async function startGeneration({
JSON.stringify({
error: "Generation failed",
details: errorMessage,
recoverable: error instanceof GenerationError,
}) + "\n"
)
);
Expand All @@ -87,9 +110,9 @@ export async function startGeneration({
abortController?.abort();
}, TIMEOUT_MS);

console.log(
`[stream] Initiating renderer request to ${process.env.RENDERER_URL}/render/${asset.configId}/`
);
let lastProgress = 0;
let lastStage = "STARTING";

const response = await fetchWithRetry(
`${process.env.RENDERER_URL}/render/${asset.configId}/`,
{
Expand All @@ -106,11 +129,9 @@ export async function startGeneration({
);

clearTimeout(timeoutId);
console.log(`[stream] Renderer response received`);

if (!response.body) {
console.log(`[stream] No response body received from renderer`);
throw new Error("No response body received from renderer");
throw new GenerationError("No response body received from renderer");
}

const reader = response.body.getReader();
Expand All @@ -119,18 +140,10 @@ export async function startGeneration({

const heartbeatInterval = setInterval(() => {
const now = Date.now();
console.log(
`[stream] Heartbeat check - Time since last event: ${
now - lastEventTime
}ms`
);
if (now - lastEventTime > 30000) {
console.log(
`[stream] Connection stalled - no data received for 30s`
);
clearInterval(heartbeatInterval);
controller.error(
new Error("Connection stalled - no data received")
new GenerationError("Connection stalled - no data received")
);
}
}, 5000);
Expand All @@ -139,12 +152,9 @@ export async function startGeneration({
while (true) {
const { value, done } = await reader.read();

if (done) {
console.log(`[stream] Reader completed`);
break;
}
lastEventTime = Date.now();
if (done) break;

lastEventTime = Date.now();
buffer += new TextDecoder().decode(value, { stream: true });
const messages = buffer.split("\n\n");
buffer = messages.pop() || "";
Expand All @@ -156,57 +166,38 @@ export async function startGeneration({
const data = JSON.parse(message.slice(6));
console.log(`[stream] Received message:`, data);

// Update last known progress
if (data.progress) lastProgress = data.progress;
if (data.stage) lastStage = data.stage;

if (data.error) {
console.log(`[stream] Renderer error:`, data.error);
sendError(data.error);
return;
throw new GenerationError(data.error);
}

if (data.path) {
const lastProgress = data.progress || 0;
const lastStage = data.stage || "ENCODING";
console.log(
`[stream] Upload stage - Progress: ${lastProgress}, Stage: ${lastStage}`
);

sendStatus("Uploading to R2...", {
progress: Math.min(lastProgress + 5, 99),
stage: lastStage,
status: "Uploading to storage...",
});

try {
console.log(
`[stream] Starting R2 upload for configId:`,
asset.configId
);
const { url, signedUrl } = await uploadVideoToR2(
`${process.env.RENDERER_URL}/assets/${asset.configId}`,
asset.configId!
);
console.log(`[stream] R2 upload complete - URL:`, url);

console.log(`[stream] Storing video in database`);
await storeGeneratedVideo({
r2Url: url,
configId: asset.configId!,
userGoogleId: user.googleId,
});

sendStatus("complete", {
signedUrl,
progress: 100,
stage: "COMPLETE",
});
revalidatePath(`/history/${asset.configId}`);
console.log(
`[stream] Generation complete for configId:`,
asset.configId
);
} catch (error) {
console.log(`[stream] R2 upload error:`, error);
sendError(error);
}
const { url, signedUrl } = await uploadVideoToR2(
`${process.env.RENDERER_URL}/assets/${asset.configId}`,
asset.configId!
);

await storeGeneratedVideo({
r2Url: url,
configId: asset.configId!,
userGoogleId: user.googleId,
});

sendStatus("complete", {
signedUrl,
progress: 100,
stage: "COMPLETE",
});
revalidatePath(`/history/${asset.configId}`);
} else {
controller.enqueue(
encoder.encode(JSON.stringify(data) + "\n")
Expand All @@ -218,13 +209,16 @@ export async function startGeneration({
error,
message
);
if (error instanceof GenerationError) {
sendError(error);
return;
}
continue;
}
}
}
} finally {
clearInterval(heartbeatInterval);
console.log(`[stream] Stream processing completed`);
}
} catch (error) {
console.log(`[stream] Fatal stream error:`, error);
Expand Down
Loading

0 comments on commit 70dcbc4

Please sign in to comment.