From e50aee1e62e94b36f1b41e5d9ccd2e37cf76bc4a Mon Sep 17 00:00:00 2001 From: CallMeVerity Date: Wed, 3 Jun 2026 04:34:26 +0100 Subject: [PATCH] Stream download for transcode, multipart upload for large output --- backend/src/services/rustfs.ts | 80 ++++++++++++++++++++++++++++++---- 1 file changed, 72 insertions(+), 8 deletions(-) diff --git a/backend/src/services/rustfs.ts b/backend/src/services/rustfs.ts index 7952fd0..01ba2dc 100644 --- a/backend/src/services/rustfs.ts +++ b/backend/src/services/rustfs.ts @@ -11,7 +11,8 @@ import { import { getSignedUrl } from "@aws-sdk/s3-request-presigner"; import { tmpdir } from "os"; import { join } from "path"; -import { unlink, writeFile } from "fs/promises"; +import { unlink } from "fs/promises"; +import { createWriteStream, createReadStream } from "fs"; const RUSTFS_ENDPOINT = process.env.RUSTFS_ENDPOINT || "http://localhost:9000"; const RUSTFS_ACCESS_KEY = process.env.RUSTFS_ACCESS_KEY || "minioadmin"; @@ -224,11 +225,11 @@ export function getStreamUrl(videoId: string): string { export async function transcodeVideo(originalKey: string): Promise { const s3 = getRustFsClient(); - // Download original from S3 to temp file const inputPath = join(tmpdir(), `transcode-input-${Date.now()}`); const outputPath = join(tmpdir(), `transcode-output-${Date.now()}.mp4`); try { + // Stream download from S3 to temp file (avoids loading whole file into RAM) const getCmd = new GetObjectCommand({ Bucket: RUSTFS_BUCKET, Key: originalKey, @@ -237,8 +238,33 @@ export async function transcodeVideo(originalKey: string): Promise { if (!response.Body) { throw new Error("Empty response body from RustFS"); } - const bytes = await response.Body.transformToByteArray(); - await writeFile(inputPath, bytes); + + const sdkStream = response.Body as any; + const webStream: ReadableStream = + sdkStream.transformToWebStream + ? sdkStream.transformToWebStream() + : sdkStream; + + const fileWriter = createWriteStream(inputPath); + const reader = webStream.getReader(); + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (!fileWriter.write(value)) { + await new Promise((resolve) => + fileWriter.once("drain", resolve), + ); + } + } + } finally { + fileWriter.end(); + reader.releaseLock(); + await new Promise((resolve) => + fileWriter.on("finish", resolve), + ); + } // Run ffmpeg to transcode const proc = Bun.spawn( @@ -280,10 +306,48 @@ export async function transcodeVideo(originalKey: string): Promise { ? originalKey.substring(0, lastDot) + ".mp4" : originalKey + ".mp4"; - // Upload transcoded file to S3 - const file = Bun.file(outputPath); - const buffer = Buffer.from(await file.arrayBuffer()); - await uploadObject(transcodedKey, buffer, "video/mp4"); + // Upload transcoded file to S3 using multipart for large files + const outputFile = Bun.file(outputPath); + const fileSize = outputFile.size; + const CHUNK_SIZE = 80 * 1024 * 1024; // 80MB chunks + + if (fileSize < CHUNK_SIZE) { + // Small enough for single upload + const buffer = Buffer.from(await outputFile.arrayBuffer()); + await uploadObject(transcodedKey, buffer, "video/mp4"); + } else { + // Multipart upload for large files + const { uploadId } = await createMultipartUpload( + transcodedKey, + "video/mp4", + ); + const numParts = Math.ceil(fileSize / CHUNK_SIZE); + const parts: { PartNumber: number; ETag: string }[] = []; + + for (let i = 0; i < numParts; i++) { + const start = i * CHUNK_SIZE; + const end = Math.min(start + CHUNK_SIZE, fileSize); + const chunk = Buffer.from( + await outputFile.slice(start, end).arrayBuffer(), + ); + + const partNumber = i + 1; + const uploadCmd = new UploadPartCommand({ + Bucket: RUSTFS_BUCKET, + Key: transcodedKey, + UploadId: uploadId, + PartNumber: partNumber, + Body: chunk, + }); + const result = await s3.send(uploadCmd); + parts.push({ + PartNumber: partNumber, + ETag: result.ETag!, + }); + } + + await completeMultipartUpload(transcodedKey, uploadId, parts); + } return transcodedKey; } finally {