diff --git a/backend/src/routes/videos.ts b/backend/src/routes/videos.ts index b60abae..871ffbf 100644 --- a/backend/src/routes/videos.ts +++ b/backend/src/routes/videos.ts @@ -8,6 +8,7 @@ import { updateTier, getVideoByMapAndPlayer, updateVideoPB, + updateTranscodedKey, } from "../services/db"; import { uploadObject, @@ -19,6 +20,7 @@ import { completeMultipartUpload, abortMultipartUpload, getVideoStream, + transcodeVideo, } from "../services/rustfs"; import { parseMtvFile } from "../services/mtv-parser"; import { getMapInfo } from "../services/momentum-api"; @@ -343,6 +345,35 @@ export const videoRoutes = new Elysia({ prefix: "/api/videos" }) }; }) + .post("/:id/transcode", async ({ params: { id } }) => { + const video = getVideoById(id); + if (!video) return status(404, { error: "Not found" }); + + if (video.transcodedKey) { + return { + ...video, + videoUrl: getStreamUrl(video.id), + mtvUrl: getObjectUrl(video.mtvKey), + thumbnailUrl: video.thumbnailKey + ? getObjectUrl(video.thumbnailKey) + : undefined, + }; + } + + const transcodedKey = await transcodeVideo(video.videoKey); + updateTranscodedKey(id, transcodedKey); + + return { + ...video, + transcodedKey, + videoUrl: getStreamUrl(video.id), + mtvUrl: getObjectUrl(video.mtvKey), + thumbnailUrl: video.thumbnailKey + ? getObjectUrl(video.thumbnailKey) + : undefined, + }; + }) + .get("/:id/stream", async ({ params: { id }, request, set }) => { const video = getVideoById(id); if (!video) return status(404, { error: "Not found" }); @@ -350,7 +381,10 @@ export const videoRoutes = new Elysia({ prefix: "/api/videos" }) const rangeHeader = request.headers.get("Range"); try { - const result = await getVideoStream(video.videoKey, rangeHeader); + const result = await getVideoStream( + video.transcodedKey ?? video.videoKey, + rangeHeader, + ); const headers = new Headers(); headers.set("Content-Type", result.contentType); @@ -465,6 +499,8 @@ export const videoRoutes = new Elysia({ prefix: "/api/videos" }) deleteObject(video.mtvKey), ]; if (video.thumbnailKey) deletes.push(deleteObject(video.thumbnailKey)); + if (video.transcodedKey) + deletes.push(deleteObject(video.transcodedKey)); await Promise.all(deletes); deleteVideoById(id); diff --git a/backend/src/services/db.ts b/backend/src/services/db.ts index 45ceeba..0326715 100644 --- a/backend/src/services/db.ts +++ b/backend/src/services/db.ts @@ -57,6 +57,9 @@ export function initDb(): void { if (!columns.some((col) => col.name === "previous_pbs")) { db.exec("ALTER TABLE videos ADD COLUMN previous_pbs TEXT"); } + if (!columns.some((col) => col.name === "transcoded_key")) { + db.exec("ALTER TABLE videos ADD COLUMN transcoded_key TEXT"); + } } function getDb(): Database { @@ -142,6 +145,7 @@ function rowToEntry(row: any): VideoEntry { videoKey: row.video_key, mtvKey: row.mtv_key, thumbnailKey: row.thumbnail_key ?? undefined, + transcodedKey: row.transcoded_key ?? undefined, tier: row.tier ?? undefined, mapId: row.map_id ?? undefined, jsonStats: row.json_stats ?? undefined, @@ -197,7 +201,8 @@ export function updateVideoPB( map_id = ?, json_stats = ?, created_at = ?, - previous_pbs = ? + previous_pbs = ?, + transcoded_key = ? WHERE id = ?`, ).run( entry.title, @@ -216,10 +221,22 @@ export function updateVideoPB( entry.jsonStats ?? null, entry.createdAt, JSON.stringify(previousPbs), + entry.transcodedKey ?? null, id, ); } +export function updateTranscodedKey( + id: string, + transcodedKey: string | null, +): boolean { + const d = getDb(); + const result = d + .prepare("UPDATE videos SET transcoded_key = ? WHERE id = ?") + .run(transcodedKey, id); + return result.changes > 0; +} + export function updateTier(id: string, tier: number | null): boolean { const d = getDb(); const result = d diff --git a/backend/src/services/rustfs.ts b/backend/src/services/rustfs.ts index 1f54aec..7952fd0 100644 --- a/backend/src/services/rustfs.ts +++ b/backend/src/services/rustfs.ts @@ -9,6 +9,9 @@ import { GetObjectCommand, } from "@aws-sdk/client-s3"; import { getSignedUrl } from "@aws-sdk/s3-request-presigner"; +import { tmpdir } from "os"; +import { join } from "path"; +import { unlink, writeFile } from "fs/promises"; const RUSTFS_ENDPOINT = process.env.RUSTFS_ENDPOINT || "http://localhost:9000"; const RUSTFS_ACCESS_KEY = process.env.RUSTFS_ACCESS_KEY || "minioadmin"; @@ -217,3 +220,79 @@ export async function getVideoStream( export function getStreamUrl(videoId: string): string { return `/api/videos/${videoId}/stream`; } + +export async function transcodeVideo(originalKey: string): Promise { + const s3 = getRustFsClient(); + + // Download original from S3 to temp file + const inputPath = join(tmpdir(), `transcode-input-${Date.now()}`); + const outputPath = join(tmpdir(), `transcode-output-${Date.now()}.mp4`); + + try { + const getCmd = new GetObjectCommand({ + Bucket: RUSTFS_BUCKET, + Key: originalKey, + }); + const response = await s3.send(getCmd); + if (!response.Body) { + throw new Error("Empty response body from RustFS"); + } + const bytes = await response.Body.transformToByteArray(); + await writeFile(inputPath, bytes); + + // Run ffmpeg to transcode + const proc = Bun.spawn( + [ + "ffmpeg", + "-i", + inputPath, + "-c:v", + "libx264", + "-preset", + "slow", + "-crf", + "18", + "-c:a", + "aac", + "-b:a", + "192k", + "-movflags", + "+faststart", + "-y", + outputPath, + ], + { + stdout: "pipe", + stderr: "pipe", + }, + ); + + const exitCode = await proc.exited; + if (exitCode !== 0) { + const stderr = await new Response(proc.stderr).text(); + throw new Error(`ffmpeg exited with code ${exitCode}: ${stderr}`); + } + + // Compute the transcoded key (replace extension with .mp4) + const lastDot = originalKey.lastIndexOf("."); + const transcodedKey = + lastDot !== -1 + ? originalKey.substring(0, lastDot) + ".mp4" + : originalKey + ".mp4"; + + // Upload transcoded file to S3 + const file = Bun.file(outputPath); + const buffer = Buffer.from(await file.arrayBuffer()); + await uploadObject(transcodedKey, buffer, "video/mp4"); + + return transcodedKey; + } finally { + // Clean up temp files + try { + await unlink(inputPath); + } catch {} + try { + await unlink(outputPath); + } catch {} + } +} diff --git a/backend/src/types/video.ts b/backend/src/types/video.ts index feab933..ae748d6 100644 --- a/backend/src/types/video.ts +++ b/backend/src/types/video.ts @@ -51,6 +51,7 @@ export interface VideoEntry { videoKey: string; mtvKey: string; thumbnailKey?: string; + transcodedKey?: string; tier?: number | null; mapId?: number | null; jsonStats?: string; diff --git a/frontend/src/components/UploadForm.tsx b/frontend/src/components/UploadForm.tsx index 405e55a..956507b 100644 --- a/frontend/src/components/UploadForm.tsx +++ b/frontend/src/components/UploadForm.tsx @@ -259,57 +259,82 @@ export default function UploadForm() { const { parts, uploadId } = initRes.presignedUrls; const totalSize = videoFile.size; const CHUNK_SIZE = 80 * 1024 * 1024; + const CONCURRENT_UPLOADS = 3; const uploadedParts: { partNumber: number; eTag: string }[] = []; + const progressPerChunk: number[] = new Array(parts.length).fill(0); - for (let i = 0; i < parts.length; i++) { - const part = parts[i]!; - const start = i * CHUNK_SIZE; + const uploadChunk = (index: number) => { + const part = parts[index]!; + const start = index * CHUNK_SIZE; const end = Math.min(start + CHUNK_SIZE, totalSize); const chunk = videoFile.slice(start, end); - const eTag = await new Promise((resolve, reject) => { - const xhr = new XMLHttpRequest(); - xhr.upload.addEventListener("progress", (e) => { - if (e.lengthComputable) { - const chunkProgress = e.loaded / e.total; - const overallUploaded = start + e.loaded; - setProgress( - 5 + - Math.round( - (overallUploaded / totalSize) * 90, - ), - ); - } - }); - xhr.addEventListener("load", () => { - if (xhr.status >= 200 && xhr.status < 300) { - const etag = xhr.getResponseHeader("ETag"); - if (etag) resolve(etag); - else - reject( - new Error("No ETag returned from upload"), + return new Promise<{ partNumber: number; eTag: string }>( + (resolve, reject) => { + const xhr = new XMLHttpRequest(); + xhr.upload.addEventListener("progress", (e) => { + if (e.lengthComputable) { + progressPerChunk[index] = e.loaded; + const total = progressPerChunk.reduce( + (a, b) => a + b, + 0, ); - } else { - reject( - new Error( - `Upload failed with status ${xhr.status}`, - ), - ); - } - }); - xhr.addEventListener("error", () => - reject(new Error("Upload failed")), - ); - xhr.open("PUT", part.url); - xhr.setRequestHeader( - "Content-Type", - videoFile.type || "video/webm", - ); - xhr.send(chunk); - }); + setProgress( + 5 + Math.round((total / totalSize) * 90), + ); + } + }); + xhr.addEventListener("load", () => { + if (xhr.status >= 200 && xhr.status < 300) { + const etag = xhr.getResponseHeader("ETag"); + if (etag) + resolve({ + partNumber: part.partNumber, + eTag: etag, + }); + else + reject( + new Error( + "No ETag returned from upload", + ), + ); + } else { + reject( + new Error( + `Upload failed with status ${xhr.status}`, + ), + ); + } + }); + xhr.addEventListener("error", () => + reject(new Error("Upload failed")), + ); + xhr.open("PUT", part.url); + xhr.setRequestHeader( + "Content-Type", + videoFile.type || "video/webm", + ); + xhr.send(chunk); + }, + ); + }; - uploadedParts.push({ partNumber: part.partNumber, eTag }); - } + let nextIndex = 0; + const uploadNext = async (): Promise => { + if (nextIndex >= parts.length) return; + const index = nextIndex++; + const result = await uploadChunk(index); + uploadedParts.push(result); + await uploadNext(); + }; + + await Promise.all( + Array.from( + { length: Math.min(CONCURRENT_UPLOADS, parts.length) }, + () => uploadNext(), + ), + ); + uploadedParts.sort((a, b) => a.partNumber - b.partNumber); setProgress(98); await uploadVideoComplete(initRes.id, uploadedParts, uploadId);